C++程序  |  279行  |  10.8 KB

// Copyright 2016 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#ifndef MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_
#define MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_

#include <memory>

#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/memory/ref_counted.h"
#include "base/task_runner.h"
#include "base/threading/thread_task_runner_handle.h"
#include "mojo/public/cpp/bindings/associated_group.h"
#include "mojo/public/cpp/bindings/associated_interface_ptr.h"
#include "mojo/public/cpp/bindings/interface_ptr.h"
#include "mojo/public/cpp/bindings/message.h"

namespace mojo {

// Instances of this class may be used from any thread to serialize |Interface|
// messages and forward them elsewhere. In general you should use one of the
// ThreadSafeInterfacePtrBase helper aliases defined below, but this type may be
// useful if you need/want to manually manage the lifetime of the underlying
// proxy object which will be used to ultimately send messages.
template <typename Interface>
class ThreadSafeForwarder : public MessageReceiverWithResponder {
 public:
  using ProxyType = typename Interface::Proxy_;
  using ForwardMessageCallback = base::Callback<void(Message)>;
  using ForwardMessageWithResponderCallback =
      base::Callback<void(Message, std::unique_ptr<MessageReceiver>)>;

  // Constructs a ThreadSafeForwarder through which Messages are forwarded to
  // |forward| or |forward_with_responder| by posting to |task_runner|.
  //
  // Any message sent through this forwarding interface will dispatch its reply,
  // if any, back to the thread which called the corresponding interface method.
  ThreadSafeForwarder(
      const scoped_refptr<base::SingleThreadTaskRunner>& task_runner,
      const ForwardMessageCallback& forward,
      const ForwardMessageWithResponderCallback& forward_with_responder,
      const AssociatedGroup& associated_group)
      : proxy_(this),
        task_runner_(task_runner),
        forward_(forward),
        forward_with_responder_(forward_with_responder),
        associated_group_(associated_group) {}

  ~ThreadSafeForwarder() override {}

  ProxyType& proxy() { return proxy_; }

 private:
  // MessageReceiverWithResponder implementation:
  bool Accept(Message* message) override {
    if (!message->associated_endpoint_handles()->empty()) {
      // If this DCHECK fails, it is likely because:
      // - This is a non-associated interface pointer setup using
      //     PtrWrapper::BindOnTaskRunner(
      //         InterfacePtrInfo<InterfaceType> ptr_info);
      //   Please see the TODO in that method.
      // - This is an associated interface which hasn't been associated with a
      //   message pipe. In other words, the corresponding
      //   AssociatedInterfaceRequest hasn't been sent.
      DCHECK(associated_group_.GetController());
      message->SerializeAssociatedEndpointHandles(
          associated_group_.GetController());
    }
    task_runner_->PostTask(FROM_HERE,
                           base::Bind(forward_, base::Passed(message)));
    return true;
  }

  bool AcceptWithResponder(Message* message,
                           MessageReceiver* response_receiver) override {
    if (!message->associated_endpoint_handles()->empty()) {
      // Please see comment for the DCHECK in the previous method.
      DCHECK(associated_group_.GetController());
      message->SerializeAssociatedEndpointHandles(
          associated_group_.GetController());
    }
    auto responder = base::MakeUnique<ForwardToCallingThread>(
        base::WrapUnique(response_receiver));
    task_runner_->PostTask(
        FROM_HERE, base::Bind(forward_with_responder_, base::Passed(message),
                              base::Passed(&responder)));
    return true;
  }

  class ForwardToCallingThread : public MessageReceiver {
   public:
    explicit ForwardToCallingThread(std::unique_ptr<MessageReceiver> responder)
        : responder_(std::move(responder)),
          caller_task_runner_(base::ThreadTaskRunnerHandle::Get()) {
    }

   private:
    bool Accept(Message* message) {
      // The current instance will be deleted when this method returns, so we
      // have to relinquish the responder's ownership so it does not get
      // deleted.
      caller_task_runner_->PostTask(FROM_HERE,
          base::Bind(&ForwardToCallingThread::CallAcceptAndDeleteResponder,
                     base::Passed(std::move(responder_)),
                     base::Passed(std::move(*message))));
      return true;
    }

    static void CallAcceptAndDeleteResponder(
        std::unique_ptr<MessageReceiver> responder,
        Message message) {
      ignore_result(responder->Accept(&message));
    }

    std::unique_ptr<MessageReceiver> responder_;
    scoped_refptr<base::SingleThreadTaskRunner> caller_task_runner_;
  };

  ProxyType proxy_;
  const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
  const ForwardMessageCallback forward_;
  const ForwardMessageWithResponderCallback forward_with_responder_;
  AssociatedGroup associated_group_;

  DISALLOW_COPY_AND_ASSIGN(ThreadSafeForwarder);
};

template <typename InterfacePtrType>
class ThreadSafeInterfacePtrBase
    : public base::RefCountedThreadSafe<
          ThreadSafeInterfacePtrBase<InterfacePtrType>> {
 public:
  using InterfaceType = typename InterfacePtrType::InterfaceType;
  using PtrInfoType = typename InterfacePtrType::PtrInfoType;

  explicit ThreadSafeInterfacePtrBase(
      std::unique_ptr<ThreadSafeForwarder<InterfaceType>> forwarder)
      : forwarder_(std::move(forwarder)) {}

  // Creates a ThreadSafeInterfacePtrBase wrapping an underlying non-thread-safe
  // InterfacePtrType which is bound to the calling thread. All messages sent
  // via this thread-safe proxy will internally be sent by first posting to this
  // (the calling) thread's TaskRunner.
  static scoped_refptr<ThreadSafeInterfacePtrBase> Create(
      InterfacePtrType interface_ptr) {
    scoped_refptr<PtrWrapper> wrapper =
        new PtrWrapper(std::move(interface_ptr));
    return new ThreadSafeInterfacePtrBase(wrapper->CreateForwarder());
  }

  // Creates a ThreadSafeInterfacePtrBase which binds the underlying
  // non-thread-safe InterfacePtrType on the specified TaskRunner. All messages
  // sent via this thread-safe proxy will internally be sent by first posting to
  // that TaskRunner.
  static scoped_refptr<ThreadSafeInterfacePtrBase> Create(
      PtrInfoType ptr_info,
      const scoped_refptr<base::SingleThreadTaskRunner>& bind_task_runner) {
    scoped_refptr<PtrWrapper> wrapper = new PtrWrapper(bind_task_runner);
    wrapper->BindOnTaskRunner(std::move(ptr_info));
    return new ThreadSafeInterfacePtrBase(wrapper->CreateForwarder());
  }

  InterfaceType* get() { return &forwarder_->proxy(); }
  InterfaceType* operator->() { return get(); }
  InterfaceType& operator*() { return *get(); }

 private:
  friend class base::RefCountedThreadSafe<
      ThreadSafeInterfacePtrBase<InterfacePtrType>>;

  struct PtrWrapperDeleter;

  // Helper class which owns an |InterfacePtrType| instance on an appropriate
  // thread. This is kept alive as long its bound within some
  // ThreadSafeForwarder's callbacks.
  class PtrWrapper
      : public base::RefCountedThreadSafe<PtrWrapper, PtrWrapperDeleter> {
   public:
    explicit PtrWrapper(InterfacePtrType ptr)
        : PtrWrapper(base::ThreadTaskRunnerHandle::Get()) {
      ptr_ = std::move(ptr);
      associated_group_ = *ptr_.internal_state()->associated_group();
    }

    explicit PtrWrapper(
        const scoped_refptr<base::SingleThreadTaskRunner>& task_runner)
        : task_runner_(task_runner) {}

    void BindOnTaskRunner(AssociatedInterfacePtrInfo<InterfaceType> ptr_info) {
      associated_group_ = AssociatedGroup(ptr_info.handle());
      task_runner_->PostTask(FROM_HERE, base::Bind(&PtrWrapper::Bind, this,
                                                   base::Passed(&ptr_info)));
    }

    void BindOnTaskRunner(InterfacePtrInfo<InterfaceType> ptr_info) {
      // TODO(yzhsen): At the momment we don't have a group controller
      // available. That means the user won't be able to pass associated
      // endpoints on this interface (at least not immediately). In order to fix
      // this, we need to create a MultiplexRouter immediately and bind it to
      // the interface pointer on the |task_runner_|. Therefore, MultiplexRouter
      // should be able to be created on a thread different than the one that it
      // is supposed to listen on. crbug.com/682334
      task_runner_->PostTask(FROM_HERE, base::Bind(&PtrWrapper::Bind, this,
                                                   base::Passed(&ptr_info)));
    }

    std::unique_ptr<ThreadSafeForwarder<InterfaceType>> CreateForwarder() {
      return base::MakeUnique<ThreadSafeForwarder<InterfaceType>>(
          task_runner_, base::Bind(&PtrWrapper::Accept, this),
          base::Bind(&PtrWrapper::AcceptWithResponder, this),
          associated_group_);
    }

   private:
    friend struct PtrWrapperDeleter;

    ~PtrWrapper() {}

    void Bind(PtrInfoType ptr_info) {
      DCHECK(task_runner_->RunsTasksOnCurrentThread());
      ptr_.Bind(std::move(ptr_info));
    }

    void Accept(Message message) {
      ptr_.internal_state()->ForwardMessage(std::move(message));
    }

    void AcceptWithResponder(Message message,
                             std::unique_ptr<MessageReceiver> responder) {
      ptr_.internal_state()->ForwardMessageWithResponder(std::move(message),
                                                         std::move(responder));
    }

    void DeleteOnCorrectThread() const {
      if (!task_runner_->RunsTasksOnCurrentThread()) {
        // NOTE: This is only called when there are no more references to
        // |this|, so binding it unretained is both safe and necessary.
        task_runner_->PostTask(FROM_HERE,
                               base::Bind(&PtrWrapper::DeleteOnCorrectThread,
                                          base::Unretained(this)));
      } else {
        delete this;
      }
    }

    InterfacePtrType ptr_;
    const scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
    AssociatedGroup associated_group_;

    DISALLOW_COPY_AND_ASSIGN(PtrWrapper);
  };

  struct PtrWrapperDeleter {
    static void Destruct(const PtrWrapper* interface_ptr) {
      interface_ptr->DeleteOnCorrectThread();
    }
  };

  ~ThreadSafeInterfacePtrBase() {}

  const std::unique_ptr<ThreadSafeForwarder<InterfaceType>> forwarder_;

  DISALLOW_COPY_AND_ASSIGN(ThreadSafeInterfacePtrBase);
};

template <typename Interface>
using ThreadSafeAssociatedInterfacePtr =
    ThreadSafeInterfacePtrBase<AssociatedInterfacePtr<Interface>>;

template <typename Interface>
using ThreadSafeInterfacePtr =
    ThreadSafeInterfacePtrBase<InterfacePtr<Interface>>;

}  // namespace mojo

#endif  // MOJO_PUBLIC_CPP_BINDINGS_THREAD_SAFE_INTERFACE_PTR_H_