void processSerializedRequest()

in thrift/lib/py/server/CppServerWrapper.cpp [238:401]


  void processSerializedRequest(
      ResponseChannelRequest::UniquePtr req,
      apache::thrift::SerializedRequest&& serializedRequest,
      apache::thrift::protocol::PROTOCOL_TYPES protType,
      Cpp2RequestContext* context,
      folly::EventBase* eb,
      apache::thrift::concurrency::ThreadManager* tm) override {
    auto fname = context->getMethodName();
    bool oneway = isOnewayMethod(fname);

    if (oneway && !req->isOneway()) {
      req->sendReply(ResponsePayload{});
    }

    apache::thrift::LegacyRequestExpiryGuard rh{std::move(req), eb};
    auto task = [=,
                 buf = apache::thrift::LegacySerializedRequest(
                           protType,
                           context->getProtoSeqId(),
                           context->getMethodName(),
                           std::move(serializedRequest))
                           .buffer,
                 rh = std::move(rh)]() mutable {
      auto req_up = std::move(rh.req);
      SCOPE_EXIT {
        rh.eb->runInEventBaseThread(
            [req_up = std::move(req_up)]() mutable { req_up = {}; });
      };

      if (!oneway && !req_up->getShouldStartProcessing()) {
        return;
      }

      folly::ByteRange input_range = buf->coalesce();
      auto input_data = const_cast<unsigned char*>(input_range.data());
      auto clientType = context->getHeader()->getClientType();

      {
        PyGILState_STATE state = PyGILState_Ensure();
        SCOPE_EXIT { PyGILState_Release(state); };

#if PY_MAJOR_VERSION == 2
        auto input =
            handle<>(PyBuffer_FromMemory(input_data, input_range.size()));
#else
        auto input = handle<>(PyMemoryView_FromMemory(
            reinterpret_cast<char*>(input_data),
            input_range.size(),
            PyBUF_READ));
#endif

        auto cd_ctor = adapter_->attr("CONTEXT_DATA");
        object contextData = cd_ctor();
        extract<CppContextData&>(contextData)().copyContextContents(context);

        auto cb_ctor = adapter_->attr("CALLBACK_WRAPPER");
        object callbackWrapper = cb_ctor();
        extract<CallbackWrapper&>(callbackWrapper)().setCallback(
            [oneway,
             req_up = std::move(req_up),
             context,
             eb = rh.eb,
             contextData,
             protType](object output) mutable {
              // Make sure the request is deleted in evb.
              SCOPE_EXIT {
                eb->runInEventBaseThread(
                    [req_up = std::move(req_up)]() mutable { req_up = {}; });
              };

              // Always called from python so no need to grab GIL.
              try {
                std::unique_ptr<folly::IOBuf> outbuf;
                if (output.is_none()) {
                  throw std::runtime_error(
                      "Unexpected error in processor method");
                }
                PyObject* output_ptr = output.ptr();
#if PY_MAJOR_VERSION == 2
                if (PyString_Check(output_ptr)) {
                  int len = extract<int>(output.attr("__len__")());
                  if (len == 0) {
                    return;
                  }
                  outbuf = folly::IOBuf::copyBuffer(
                      extract<const char*>(output), len);
                } else
#endif
                    if (PyBytes_Check(output_ptr)) {
                  int len = PyBytes_Size(output_ptr);
                  if (len == 0) {
                    return;
                  }
                  outbuf = folly::IOBuf::copyBuffer(
                      PyBytes_AsString(output_ptr), len);
                } else {
                  throw std::runtime_error(
                      "Return from processor "
                      "method is not string or bytes");
                }

                if (!req_up->isActive()) {
                  return;
                }
                CppContextData& cppContextData =
                    extract<CppContextData&>(contextData);
                if (!cppContextData.getHeaderEx().empty()) {
                  context->getHeader()->setHeader(
                      kHeaderEx, cppContextData.getHeaderEx());
                }
                if (!cppContextData.getHeaderExWhat().empty()) {
                  context->getHeader()->setHeader(
                      kHeaderExWhat, cppContextData.getHeaderExWhat());
                }
                auto response = LegacySerializedResponse{std::move(outbuf)};
                auto [mtype, payload] = std::move(response).extractPayload(
                    req_up->includeEnvelope(), protType);
                payload.transform(context->getHeader()->getWriteTransforms());
                eb->runInEventBaseThread(
                    [mtype = mtype,
                     req_up = std::move(req_up),
                     payload = std::move(payload)]() mutable {
                      if (mtype == MessageType::T_REPLY) {
                        req_up->sendReply(std::move(payload));
                      } else if (mtype == MessageType::T_EXCEPTION) {
                        req_up->sendException(std::move(payload));
                      } else {
                        LOG(ERROR) << "Invalid type. type=" << uint16_t(mtype);
                      }
                    });
              } catch (const std::exception& e) {
                if (!oneway) {
                  req_up->sendErrorWrapped(
                      folly::make_exception_wrapper<TApplicationException>(
                          folly::to<std::string>(
                              "Failed to read response from Python:",
                              e.what())),
                      "python");
                }
              }
            });

        adapter_->attr("call_processor")(
            input,
            makePythonHeaders(context->getHeader()->getHeaders(), context),
            int(clientType),
            int(protType),
            contextData,
            callbackWrapper);
      }
    };

    using PriorityThreadManager =
        apache::thrift::concurrency::PriorityThreadManager;
    auto ptm = dynamic_cast<PriorityThreadManager*>(tm);
    if (ptm != nullptr) {
      ptm->add(
          getMethodPriority(fname, context),
          std::make_shared<apache::thrift::concurrency::FunctionRunner>(
              std::move(task)));
      return;
    }
    tm->add(std::move(task));
  }