void ChannelWrapper::sendRequestImpl()

in cpp-channel/cpp/HsChannel.cpp [58:148]


void ChannelWrapper::sendRequestImpl(
    ChannelWrapper::RequestDirection requestDirection,
    apache::thrift::protocol::PROTOCOL_TYPES protocolId,
    CallbackPtr&& callback,
    std::unique_ptr<folly::IOBuf>&& message,
    apache::thrift::RpcOptions&& rpcOptions) {
  auto header = std::make_shared<apache::thrift::transport::THeader>(0);
  header->setProtocolId(protocolId);
  header->setHeaders(rpcOptions.releaseWriteHeaders());

  auto envelopeAndRequest =
      apache::thrift::EnvelopeUtil::stripRequestEnvelope(std::move(message));
  if (!envelopeAndRequest.has_value()) {
    callback.release()->onResponseError(
        folly::make_exception_wrapper<
            apache::thrift::transport::TTransportException>(
            apache::thrift::transport::TTransportException::CORRUPTED_DATA,
            "Unexpected problem stripping envelope"));
    return;
  }

  auto envelope = std::move(envelopeAndRequest->first);
  callback->setMethodName(envelope.methodName);

  // Create a new context-stack for the request, which will be used to trigger
  // the appropriate thrift middleware to run on the request in itself (e.g.
  // ContextProp).
  //
  // Note that we need to be very careful about the lifetime of the object
  // and everything it does reference, as this can cause issues with memory
  // leaks.
  //
  // This is why we're directly referencing shared-pointers as well as
  // preserving the lifetime of the stack, and the method-name, on the callback
  // object itself.
  auto contextStack = apache::thrift::ContextStack::createWithClientContext(
      handlers_,
      "" /* service name */,
      callback->getMethodName().c_str(),
      *header);

  if (contextStack) {
    contextStack->preWrite();
  }

  auto request =
      apache::thrift::SerializedRequest(std::move(envelopeAndRequest->second));

  if (contextStack) {
    apache::thrift::SerializedMessage serializedMessage;
    serializedMessage.protocolType = envelope.protocolId;
    serializedMessage.buffer = request.buffer.get();
    serializedMessage.methodName = envelope.methodName;

    contextStack->onWriteData(serializedMessage);
    contextStack->postWrite(
        folly::to_narrow(request.buffer->computeChainDataLength()));
    contextStack->resetClientRequestContextHeader();
  }

  // Transfer ownership of the context-stack to the callback in order to
  // preserve lifetime throughout the request
  callback->setContextStack(std::move(contextStack));

  runOnClientEvbIfAvailable([client = client_,
                             requestDirection = requestDirection,
                             rpcOptions = std::move(rpcOptions),
                             request = std::move(request),
                             header = std::move(header),
                             envelope = std::move(envelope),
                             callback = std::move(callback)]() mutable {
    switch (requestDirection) {
      case ChannelWrapper::RequestDirection::WITH_RESPONSE:
        client->get()->sendRequestResponse(
            std::move(rpcOptions),
            envelope.methodName,
            std::move(request),
            std::move(header),
            std::move(callback));
        break;
      case ChannelWrapper::RequestDirection::NO_RESPONSE:
        client->get()->sendRequestNoResponse(
            std::move(rpcOptions),
            envelope.methodName,
            std::move(request),
            std::move(header),
            std::move(callback));
        break;
    }
  });
}