// Copyright 2016 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #ifndef MOJO_CORE_PORTS_NODE_H_ #define MOJO_CORE_PORTS_NODE_H_ #include <stddef.h> #include <stdint.h> #include <queue> #include <unordered_map> #include "base/component_export.h" #include "base/macros.h" #include "base/memory/ref_counted.h" #include "base/synchronization/lock.h" #include "mojo/core/ports/event.h" #include "mojo/core/ports/name.h" #include "mojo/core/ports/port.h" #include "mojo/core/ports/port_ref.h" #include "mojo/core/ports/user_data.h" namespace mojo { namespace core { namespace ports { enum : int { OK = 0, ERROR_PORT_UNKNOWN = -10, ERROR_PORT_EXISTS = -11, ERROR_PORT_STATE_UNEXPECTED = -12, ERROR_PORT_CANNOT_SEND_SELF = -13, ERROR_PORT_PEER_CLOSED = -14, ERROR_PORT_CANNOT_SEND_PEER = -15, ERROR_NOT_IMPLEMENTED = -100, }; struct PortStatus { bool has_messages; bool receiving_messages; bool peer_closed; bool peer_remote; size_t queued_message_count; size_t queued_num_bytes; }; class MessageFilter; class NodeDelegate; // A Node maintains a collection of Ports (see port.h) indexed by unique 128-bit // addresses (names), performing routing and processing of events among the // Ports within the Node and to or from other Nodes in the system. Typically // (and practically, in all uses today) there is a single Node per system // process. Thus a Node boundary effectively models a process boundary. // // New Ports can be created uninitialized using CreateUninitializedPort (and // later initialized using InitializePort), or created in a fully initialized // state using CreatePortPair(). Initialized ports have exactly one conjugate // port which is the ultimate receiver of any user messages sent by that port. // See SendUserMessage(). // // In addition to routing user message events, various control events are used // by Nodes to coordinate Port behavior and lifetime within and across Nodes. // See Event documentation for description of different types of events used by // a Node to coordinate behavior. class COMPONENT_EXPORT(MOJO_CORE_PORTS) Node { public: enum class ShutdownPolicy { DONT_ALLOW_LOCAL_PORTS, ALLOW_LOCAL_PORTS, }; // Does not take ownership of the delegate. Node(const NodeName& name, NodeDelegate* delegate); ~Node(); // Returns true iff there are no open ports referring to another node or ports // in the process of being transferred from this node to another. If this // returns false, then to ensure clean shutdown, it is necessary to keep the // node alive and continue routing messages to it via AcceptMessage. This // method may be called again after AcceptMessage to check if the Node is now // ready to be destroyed. // // If |policy| is set to |ShutdownPolicy::ALLOW_LOCAL_PORTS|, this will return // |true| even if some ports remain alive, as long as none of them are proxies // to another node. bool CanShutdownCleanly( ShutdownPolicy policy = ShutdownPolicy::DONT_ALLOW_LOCAL_PORTS); // Lookup the named port. int GetPort(const PortName& port_name, PortRef* port_ref); // Creates a port on this node. Before the port can be used, it must be // initialized using InitializePort. This method is useful for bootstrapping // a connection between two nodes. Generally, ports are created using // CreatePortPair instead. int CreateUninitializedPort(PortRef* port_ref); // Initializes a newly created port. int InitializePort(const PortRef& port_ref, const NodeName& peer_node_name, const PortName& peer_port_name); // Generates a new connected pair of ports bound to this node. These ports // are initialized and ready to go. int CreatePortPair(PortRef* port0_ref, PortRef* port1_ref); // User data associated with the port. int SetUserData(const PortRef& port_ref, scoped_refptr<UserData> user_data); int GetUserData(const PortRef& port_ref, scoped_refptr<UserData>* user_data); // Prevents further messages from being sent from this port or delivered to // this port. The port is removed, and the port's peer is notified of the // closure after it has consumed all pending messages. int ClosePort(const PortRef& port_ref); // Returns the current status of the port. int GetStatus(const PortRef& port_ref, PortStatus* port_status); // Returns the next available message on the specified port or returns a null // message if there are none available. Returns ERROR_PORT_PEER_CLOSED to // indicate that this port's peer has closed. In such cases GetMessage may // be called until it yields a null message, indicating that no more messages // may be read from the port. // // If |filter| is non-null, the next available message is returned only if it // is matched by the filter. If the provided filter does not match the next // available message, GetMessage() behaves as if there is no message // available. Ownership of |filter| is not taken, and it must outlive the // extent of this call. int GetMessage(const PortRef& port_ref, std::unique_ptr<UserMessageEvent>* message, MessageFilter* filter); // Sends a message from the specified port to its peer. Note that the message // notification may arrive synchronously (via PortStatusChanged() on the // delegate) if the peer is local to this Node. int SendUserMessage(const PortRef& port_ref, std::unique_ptr<UserMessageEvent> message); // Corresponding to NodeDelegate::ForwardEvent. int AcceptEvent(ScopedEvent event); // Called to merge two ports with each other. If you have two independent // port pairs A <=> B and C <=> D, the net result of merging B and C is a // single connected port pair A <=> D. // // Note that the behavior of this operation is undefined if either port to be // merged (B or C above) has ever been read from or written to directly, and // this must ONLY be called on one side of the merge, though it doesn't matter // which side. // // It is safe for the non-merged peers (A and D above) to be transferred, // closed, and/or written to before, during, or after the merge. int MergePorts(const PortRef& port_ref, const NodeName& destination_node_name, const PortName& destination_port_name); // Like above but merges two ports local to this node. Because both ports are // local this can also verify that neither port has been written to before the // merge. If this fails for any reason, both ports are closed. Otherwise OK // is returned and the ports' receiving peers are connected to each other. int MergeLocalPorts(const PortRef& port0_ref, const PortRef& port1_ref); // Called to inform this node that communication with another node is lost // indefinitely. This triggers cleanup of ports bound to this node. int LostConnectionToNode(const NodeName& node_name); private: // Helper to ensure that a Node always calls into its delegate safely, i.e. // without holding any internal locks. class DelegateHolder { public: DelegateHolder(Node* node, NodeDelegate* delegate); ~DelegateHolder(); NodeDelegate* operator->() const { EnsureSafeDelegateAccess(); return delegate_; } private: #if DCHECK_IS_ON() void EnsureSafeDelegateAccess() const; #else void EnsureSafeDelegateAccess() const {} #endif Node* const node_; NodeDelegate* const delegate_; DISALLOW_COPY_AND_ASSIGN(DelegateHolder); }; int OnUserMessage(std::unique_ptr<UserMessageEvent> message); int OnPortAccepted(std::unique_ptr<PortAcceptedEvent> event); int OnObserveProxy(std::unique_ptr<ObserveProxyEvent> event); int OnObserveProxyAck(std::unique_ptr<ObserveProxyAckEvent> event); int OnObserveClosure(std::unique_ptr<ObserveClosureEvent> event); int OnMergePort(std::unique_ptr<MergePortEvent> event); int AddPortWithName(const PortName& port_name, scoped_refptr<Port> port); void ErasePort(const PortName& port_name); int SendUserMessageInternal(const PortRef& port_ref, std::unique_ptr<UserMessageEvent>* message); int MergePortsInternal(const PortRef& port0_ref, const PortRef& port1_ref, bool allow_close_on_bad_state); void ConvertToProxy(Port* port, const NodeName& to_node_name, PortName* port_name, Event::PortDescriptor* port_descriptor); int AcceptPort(const PortName& port_name, const Event::PortDescriptor& port_descriptor); int PrepareToForwardUserMessage(const PortRef& forwarding_port_ref, Port::State expected_port_state, bool ignore_closed_peer, UserMessageEvent* message, NodeName* forward_to_node); int BeginProxying(const PortRef& port_ref); int ForwardUserMessagesFromProxy(const PortRef& port_ref); void InitiateProxyRemoval(const PortRef& port_ref); void TryRemoveProxy(const PortRef& port_ref); void DestroyAllPortsWithPeer(const NodeName& node_name, const PortName& port_name); const NodeName name_; const DelegateHolder delegate_; // Guards |ports_|. This must never be acquired while an individual port's // lock is held on the same thread. Conversely, individual port locks may be // acquired while this one is held. // // Because UserMessage events may execute arbitrary user code during // destruction, it is also important to ensure that such events are never // destroyed while this (or any individual Port) lock is held. base::Lock ports_lock_; std::unordered_map<PortName, scoped_refptr<Port>> ports_; DISALLOW_COPY_AND_ASSIGN(Node); }; } // namespace ports } // namespace core } // namespace mojo #endif // MOJO_CORE_PORTS_NODE_H_