in thrift/lib/py/server/CppServerWrapper.cpp [238:401]
void processSerializedRequest(
ResponseChannelRequest::UniquePtr req,
apache::thrift::SerializedRequest&& serializedRequest,
apache::thrift::protocol::PROTOCOL_TYPES protType,
Cpp2RequestContext* context,
folly::EventBase* eb,
apache::thrift::concurrency::ThreadManager* tm) override {
auto fname = context->getMethodName();
bool oneway = isOnewayMethod(fname);
if (oneway && !req->isOneway()) {
req->sendReply(ResponsePayload{});
}
apache::thrift::LegacyRequestExpiryGuard rh{std::move(req), eb};
auto task = [=,
buf = apache::thrift::LegacySerializedRequest(
protType,
context->getProtoSeqId(),
context->getMethodName(),
std::move(serializedRequest))
.buffer,
rh = std::move(rh)]() mutable {
auto req_up = std::move(rh.req);
SCOPE_EXIT {
rh.eb->runInEventBaseThread(
[req_up = std::move(req_up)]() mutable { req_up = {}; });
};
if (!oneway && !req_up->getShouldStartProcessing()) {
return;
}
folly::ByteRange input_range = buf->coalesce();
auto input_data = const_cast<unsigned char*>(input_range.data());
auto clientType = context->getHeader()->getClientType();
{
PyGILState_STATE state = PyGILState_Ensure();
SCOPE_EXIT { PyGILState_Release(state); };
#if PY_MAJOR_VERSION == 2
auto input =
handle<>(PyBuffer_FromMemory(input_data, input_range.size()));
#else
auto input = handle<>(PyMemoryView_FromMemory(
reinterpret_cast<char*>(input_data),
input_range.size(),
PyBUF_READ));
#endif
auto cd_ctor = adapter_->attr("CONTEXT_DATA");
object contextData = cd_ctor();
extract<CppContextData&>(contextData)().copyContextContents(context);
auto cb_ctor = adapter_->attr("CALLBACK_WRAPPER");
object callbackWrapper = cb_ctor();
extract<CallbackWrapper&>(callbackWrapper)().setCallback(
[oneway,
req_up = std::move(req_up),
context,
eb = rh.eb,
contextData,
protType](object output) mutable {
// Make sure the request is deleted in evb.
SCOPE_EXIT {
eb->runInEventBaseThread(
[req_up = std::move(req_up)]() mutable { req_up = {}; });
};
// Always called from python so no need to grab GIL.
try {
std::unique_ptr<folly::IOBuf> outbuf;
if (output.is_none()) {
throw std::runtime_error(
"Unexpected error in processor method");
}
PyObject* output_ptr = output.ptr();
#if PY_MAJOR_VERSION == 2
if (PyString_Check(output_ptr)) {
int len = extract<int>(output.attr("__len__")());
if (len == 0) {
return;
}
outbuf = folly::IOBuf::copyBuffer(
extract<const char*>(output), len);
} else
#endif
if (PyBytes_Check(output_ptr)) {
int len = PyBytes_Size(output_ptr);
if (len == 0) {
return;
}
outbuf = folly::IOBuf::copyBuffer(
PyBytes_AsString(output_ptr), len);
} else {
throw std::runtime_error(
"Return from processor "
"method is not string or bytes");
}
if (!req_up->isActive()) {
return;
}
CppContextData& cppContextData =
extract<CppContextData&>(contextData);
if (!cppContextData.getHeaderEx().empty()) {
context->getHeader()->setHeader(
kHeaderEx, cppContextData.getHeaderEx());
}
if (!cppContextData.getHeaderExWhat().empty()) {
context->getHeader()->setHeader(
kHeaderExWhat, cppContextData.getHeaderExWhat());
}
auto response = LegacySerializedResponse{std::move(outbuf)};
auto [mtype, payload] = std::move(response).extractPayload(
req_up->includeEnvelope(), protType);
payload.transform(context->getHeader()->getWriteTransforms());
eb->runInEventBaseThread(
[mtype = mtype,
req_up = std::move(req_up),
payload = std::move(payload)]() mutable {
if (mtype == MessageType::T_REPLY) {
req_up->sendReply(std::move(payload));
} else if (mtype == MessageType::T_EXCEPTION) {
req_up->sendException(std::move(payload));
} else {
LOG(ERROR) << "Invalid type. type=" << uint16_t(mtype);
}
});
} catch (const std::exception& e) {
if (!oneway) {
req_up->sendErrorWrapped(
folly::make_exception_wrapper<TApplicationException>(
folly::to<std::string>(
"Failed to read response from Python:",
e.what())),
"python");
}
}
});
adapter_->attr("call_processor")(
input,
makePythonHeaders(context->getHeader()->getHeaders(), context),
int(clientType),
int(protType),
contextData,
callbackWrapper);
}
};
using PriorityThreadManager =
apache::thrift::concurrency::PriorityThreadManager;
auto ptm = dynamic_cast<PriorityThreadManager*>(tm);
if (ptm != nullptr) {
ptm->add(
getMethodPriority(fname, context),
std::make_shared<apache::thrift::concurrency::FunctionRunner>(
std::move(task)));
return;
}
tm->add(std::move(task));
}