in cpp-channel/cpp/HsChannel.cpp [58:148]
void ChannelWrapper::sendRequestImpl(
ChannelWrapper::RequestDirection requestDirection,
apache::thrift::protocol::PROTOCOL_TYPES protocolId,
CallbackPtr&& callback,
std::unique_ptr<folly::IOBuf>&& message,
apache::thrift::RpcOptions&& rpcOptions) {
auto header = std::make_shared<apache::thrift::transport::THeader>(0);
header->setProtocolId(protocolId);
header->setHeaders(rpcOptions.releaseWriteHeaders());
auto envelopeAndRequest =
apache::thrift::EnvelopeUtil::stripRequestEnvelope(std::move(message));
if (!envelopeAndRequest.has_value()) {
callback.release()->onResponseError(
folly::make_exception_wrapper<
apache::thrift::transport::TTransportException>(
apache::thrift::transport::TTransportException::CORRUPTED_DATA,
"Unexpected problem stripping envelope"));
return;
}
auto envelope = std::move(envelopeAndRequest->first);
callback->setMethodName(envelope.methodName);
// Create a new context-stack for the request, which will be used to trigger
// the appropriate thrift middleware to run on the request in itself (e.g.
// ContextProp).
//
// Note that we need to be very careful about the lifetime of the object
// and everything it does reference, as this can cause issues with memory
// leaks.
//
// This is why we're directly referencing shared-pointers as well as
// preserving the lifetime of the stack, and the method-name, on the callback
// object itself.
auto contextStack = apache::thrift::ContextStack::createWithClientContext(
handlers_,
"" /* service name */,
callback->getMethodName().c_str(),
*header);
if (contextStack) {
contextStack->preWrite();
}
auto request =
apache::thrift::SerializedRequest(std::move(envelopeAndRequest->second));
if (contextStack) {
apache::thrift::SerializedMessage serializedMessage;
serializedMessage.protocolType = envelope.protocolId;
serializedMessage.buffer = request.buffer.get();
serializedMessage.methodName = envelope.methodName;
contextStack->onWriteData(serializedMessage);
contextStack->postWrite(
folly::to_narrow(request.buffer->computeChainDataLength()));
contextStack->resetClientRequestContextHeader();
}
// Transfer ownership of the context-stack to the callback in order to
// preserve lifetime throughout the request
callback->setContextStack(std::move(contextStack));
runOnClientEvbIfAvailable([client = client_,
requestDirection = requestDirection,
rpcOptions = std::move(rpcOptions),
request = std::move(request),
header = std::move(header),
envelope = std::move(envelope),
callback = std::move(callback)]() mutable {
switch (requestDirection) {
case ChannelWrapper::RequestDirection::WITH_RESPONSE:
client->get()->sendRequestResponse(
std::move(rpcOptions),
envelope.methodName,
std::move(request),
std::move(header),
std::move(callback));
break;
case ChannelWrapper::RequestDirection::NO_RESPONSE:
client->get()->sendRequestNoResponse(
std::move(rpcOptions),
envelope.methodName,
std::move(request),
std::move(header),
std::move(callback));
break;
}
});
}