class P>
class RPCArgTypeCheckHelper, std::tuple<>> {
public:
static const bool value = true;
};
template class P, typename T, typename... Ts,
typename U, typename... Us>
class RPCArgTypeCheckHelper, std::tuple> {
public:
static const bool value =
P::value &&
RPCArgTypeCheckHelper, std::tuple>::value;
};
template class P, typename T1Sig, typename T2Sig>
class RPCArgTypeCheck {
public:
using T1Tuple = typename FunctionArgsTuple::Type;
using T2Tuple = typename FunctionArgsTuple::Type;
static_assert(std::tuple_size::value >=
std::tuple_size::value,
"Too many arguments to RPC call");
static_assert(std::tuple_size::value <=
std::tuple_size::value,
"Too few arguments to RPC call");
static const bool value = RPCArgTypeCheckHelper::value;
};
template
class CanSerialize {
private:
using S = SerializationTraits;
template
static std::true_type
check(typename std::enable_if<
std::is_same(),
std::declval())),
Error>::value,
void *>::type);
template static std::false_type check(...);
public:
static const bool value = decltype(check(0))::value;
};
template
class CanDeserialize {
private:
using S = SerializationTraits;
template
static std::true_type
check(typename std::enable_if<
std::is_same(),
std::declval())),
Error>::value,
void *>::type);
template static std::false_type check(...);
public:
static const bool value = decltype(check(0))::value;
};
/// Contains primitive utilities for defining, calling and handling calls to
/// remote procedures. ChannelT is a bidirectional stream conforming to the
/// RPCChannel interface (see RPCChannel.h), FunctionIdT is a procedure
/// identifier type that must be serializable on ChannelT, and SequenceNumberT
/// is an integral type that will be used to number in-flight function calls.
///
/// These utilities support the construction of very primitive RPC utilities.
/// Their intent is to ensure correct serialization and deserialization of
/// procedure arguments, and to keep the client and server's view of the API in
/// sync.
template
class RPCEndpointBase {
protected:
class OrcRPCInvalid : public Function {
public:
static const char *getName() { return "__orc_rpc$invalid"; }
};
class OrcRPCResponse : public Function {
public:
static const char *getName() { return "__orc_rpc$response"; }
};
class OrcRPCNegotiate
: public Function {
public:
static const char *getName() { return "__orc_rpc$negotiate"; }
};
// Helper predicate for testing for the presence of SerializeTraits
// serializers.
template
class CanSerializeCheck : detail::CanSerialize {
public:
using detail::CanSerialize::value;
static_assert(value, "Missing serializer for argument (Can't serialize the "
"first template type argument of CanSerializeCheck "
"from the second)");
};
// Helper predicate for testing for the presence of SerializeTraits
// deserializers.
template
class CanDeserializeCheck
: detail::CanDeserialize {
public:
using detail::CanDeserialize::value;
static_assert(value, "Missing deserializer for argument (Can't deserialize "
"the second template type argument of "
"CanDeserializeCheck from the first)");
};
public:
/// Construct an RPC instance on a channel.
RPCEndpointBase(ChannelT &C, bool LazyAutoNegotiation)
: C(C), LazyAutoNegotiation(LazyAutoNegotiation) {
// Hold ResponseId in a special variable, since we expect Response to be
// called relatively frequently, and want to avoid the map lookup.
ResponseId = FnIdAllocator.getResponseId();
RemoteFunctionIds[OrcRPCResponse::getPrototype()] = ResponseId;
// Register the negotiate function id and handler.
auto NegotiateId = FnIdAllocator.getNegotiateId();
RemoteFunctionIds[OrcRPCNegotiate::getPrototype()] = NegotiateId;
Handlers[NegotiateId] = wrapHandler(
[this](const std::string &Name) { return handleNegotiate(Name); });
}
/// Negotiate a function id for Func with the other end of the channel.
template Error negotiateFunction(bool Retry = false) {
return getRemoteFunctionId(true, Retry).takeError();
}
/// Append a call Func, does not call send on the channel.
/// The first argument specifies a user-defined handler to be run when the
/// function returns. The handler should take an Expected,
/// or an Error (if Func::ReturnType is void). The handler will be called
/// with an error if the return value is abandoned due to a channel error.
template
Error appendCallAsync(HandlerT Handler, const ArgTs &... Args) {
static_assert(
detail::RPCArgTypeCheck::value,
"");
// Look up the function ID.
FunctionIdT FnId;
if (auto FnIdOrErr = getRemoteFunctionId(LazyAutoNegotiation, false))
FnId = *FnIdOrErr;
else {
// Negotiation failed. Notify the handler then return the negotiate-failed
// error.
cantFail(Handler(make_error()));
return FnIdOrErr.takeError();
}
SequenceNumberT SeqNo; // initialized in locked scope below.
{
// Lock the pending responses map and sequence number manager.
std::lock_guard Lock(ResponsesMutex);
// Allocate a sequence number.
SeqNo = SequenceNumberMgr.getSequenceNumber();
assert(!PendingResponses.count(SeqNo) &&
"Sequence number already allocated");
// Install the user handler.
PendingResponses[SeqNo] =
detail::createResponseHandler(
std::move(Handler));
}
// Open the function call message.
if (auto Err = C.startSendMessage(FnId, SeqNo)) {
abandonPendingResponses();
return Err;
}
// Serialize the call arguments.
if (auto Err = detail::HandlerTraits::serializeArgs(
C, Args...)) {
abandonPendingResponses();
return Err;
}
// Close the function call messagee.
if (auto Err = C.endSendMessage()) {
abandonPendingResponses();
return Err;
}
return Error::success();
}
Error sendAppendedCalls() { return C.send(); };
template
Error callAsync(HandlerT Handler, const ArgTs &... Args) {
if (auto Err = appendCallAsync(std::move(Handler), Args...))
return Err;
return C.send();
}
/// Handle one incoming call.
Error handleOne() {
FunctionIdT FnId;
SequenceNumberT SeqNo;
if (auto Err = C.startReceiveMessage(FnId, SeqNo)) {
abandonPendingResponses();
return Err;
}
if (FnId == ResponseId)
return handleResponse(SeqNo);
auto I = Handlers.find(FnId);
if (I != Handlers.end())
return I->second(C, SeqNo);
// else: No handler found. Report error to client?
return make_error>(FnId,
SeqNo);
}
/// Helper for handling setter procedures - this method returns a functor that
/// sets the variables referred to by Args... to values deserialized from the
/// channel.
/// E.g.
///
/// typedef Function<0, bool, int> Func1;
///
/// ...
/// bool B;
/// int I;
/// if (auto Err = expect(Channel, readArgs(B, I)))
/// /* Handle Args */ ;
///
template
static detail::ReadArgs readArgs(ArgTs &... Args) {
return detail::ReadArgs(Args...);
}
/// Abandon all outstanding result handlers.
///
/// This will call all currently registered result handlers to receive an
/// "abandoned" error as their argument. This is used internally by the RPC
/// in error situations, but can also be called directly by clients who are
/// disconnecting from the remote and don't or can't expect responses to their
/// outstanding calls. (Especially for outstanding blocking calls, calling
/// this function may be necessary to avoid dead threads).
void abandonPendingResponses() {
// Lock the pending responses map and sequence number manager.
std::lock_guard Lock(ResponsesMutex);
for (auto &KV : PendingResponses)
KV.second->abandon();
PendingResponses.clear();
SequenceNumberMgr.reset();
}
/// Remove the handler for the given function.
/// A handler must currently be registered for this function.
template
void removeHandler() {
auto IdItr = LocalFunctionIds.find(Func::getPrototype());
assert(IdItr != LocalFunctionIds.end() &&
"Function does not have a registered handler");
auto HandlerItr = Handlers.find(IdItr->second);
assert(HandlerItr != Handlers.end() &&
"Function does not have a registered handler");
Handlers.erase(HandlerItr);
}
/// Clear all handlers.
void clearHandlers() {
Handlers.clear();
}
protected:
FunctionIdT getInvalidFunctionId() const {
return FnIdAllocator.getInvalidId();
}
/// Add the given handler to the handler map and make it available for
/// autonegotiation and execution.
template
void addHandlerImpl(HandlerT Handler) {
static_assert(detail::RPCArgTypeCheck<
CanDeserializeCheck, typename Func::Type,
typename detail::HandlerTraits::Type>::value,
"");
FunctionIdT NewFnId = FnIdAllocator.template allocate();
LocalFunctionIds[Func::getPrototype()] = NewFnId;
Handlers[NewFnId] = wrapHandler(std::move(Handler));
}
template
void addAsyncHandlerImpl(HandlerT Handler) {
static_assert(detail::RPCArgTypeCheck<
CanDeserializeCheck, typename Func::Type,
typename detail::AsyncHandlerTraits<
typename detail::HandlerTraits::Type
>::Type>::value,
"");
FunctionIdT NewFnId = FnIdAllocator.template allocate();
LocalFunctionIds[Func::getPrototype()] = NewFnId;
Handlers[NewFnId] = wrapAsyncHandler(std::move(Handler));
}
Error handleResponse(SequenceNumberT SeqNo) {
using Handler = typename decltype(PendingResponses)::mapped_type;
Handler PRHandler;
{
// Lock the pending responses map and sequence number manager.
std::unique_lock Lock(ResponsesMutex);
auto I = PendingResponses.find(SeqNo);
if (I != PendingResponses.end()) {
PRHandler = std::move(I->second);
PendingResponses.erase(I);
SequenceNumberMgr.releaseSequenceNumber(SeqNo);
} else {
// Unlock the pending results map to prevent recursive lock.
Lock.unlock();
abandonPendingResponses();
return make_error<
InvalidSequenceNumberForResponse>(SeqNo);
}
}
assert(PRHandler &&
"If we didn't find a response handler we should have bailed out");
if (auto Err = PRHandler->handleResponse(C)) {
abandonPendingResponses();
return Err;
}
return Error::success();
}
FunctionIdT handleNegotiate(const std::string &Name) {
auto I = LocalFunctionIds.find(Name);
if (I == LocalFunctionIds.end())
return getInvalidFunctionId();
return I->second;
}
// Find the remote FunctionId for the given function.
template
Expected