CoTryTask call()

in src/common/serde/ClientContext.h [40:131]


  CoTryTask<Rsp> call(const Req &req,
                      const net::UserRequestOptions *customOptions = nullptr,
                      Timestamp *timestamp = nullptr) {
    auto options = *options_.load(std::memory_order_acquire);
    if (customOptions != nullptr) {
      options.merge(*customOptions);
    }

    net::Waiter::Item item;
    uint64_t uuid = net::Waiter::instance().bind(item);

    Timestamp ts;
    if ((options.logLongRunningThreshold != 0_ms || options.reportMetrics) && timestamp == nullptr) {
      timestamp = &ts;
    }

    MessagePacket packet(req);
    packet.uuid = uuid;
    packet.serviceId = ServiceID;
    packet.methodId = MethodID;
    packet.flags = EssentialFlags::IsReq;
    if (options.compression) {
      packet.flags |= EssentialFlags::UseCompress;
    }
    if (options.enableRDMAControl) {
      packet.flags |= EssentialFlags::ControlRDMA;
    }
    if (timestamp != nullptr) {
      packet.timestamp = Timestamp{UtcClock::now().time_since_epoch().count()};
    }

    auto writeItem = net::WriteItem::createMessage(packet, options);
    writeItem->uuid = uuid;
    auto requestLength = writeItem->buf->length();
    if (LIKELY(std::holds_alternative<net::IOWorker *>(connectionSource_))) {
      std::get<net::IOWorker *>(connectionSource_)->sendAsync(destAddr_, net::WriteList(std::move(writeItem)));
    } else if (std::holds_alternative<net::Transport *>(connectionSource_)) {
      std::get<net::Transport *>(connectionSource_)->send(net::WriteList(std::move(writeItem)));
    } else {
      co_return MAKE_ERROR_F(StatusCode::kFoundBug,
                             "Sync client call async method: service id {}, method id {}",
                             ServiceID,
                             MethodID);
    }

    net::Waiter::instance().schedule(uuid, options.timeout);
    co_await item.baton;

    if (UNLIKELY(!item.status)) {
      if (item.status.code() == RPCCode::kTimeout && std::holds_alternative<net::IOWorker *>(connectionSource_)) {
        if (item.transport) {
          XLOGF(INFO, "req timeout and close transport {}", fmt::ptr(item.transport.get()));
          co_await item.transport->closeIB();
        } else {
          XLOGF(INFO, "req timeout but no transport");
        }
        std::get<net::IOWorker *>(connectionSource_)->checkConnections(destAddr_, Duration::zero());
      }
      co_return makeError(std::move(item.status));
    }

    Result<Rsp> rsp = makeError(StatusCode::kUnknown);
    auto deserializeResult = serde::deserialize(rsp, item.packet.payload);
    if (UNLIKELY(!deserializeResult)) {
      XLOGF(ERR, "deserialize rsp error: {}", deserializeResult.error());
      if (item.transport) {
        item.transport->invalidate();
      }
      co_return makeError(std::move(deserializeResult.error()));
    }
    if (timestamp != nullptr && item.packet.timestamp.has_value()) {
      *timestamp = *item.packet.timestamp;
      timestamp->clientWaked = UtcClock::now().time_since_epoch().count();
    }
    if (options.logLongRunningThreshold != 0_ms && timestamp->totalLatency() >= options.logLongRunningThreshold) {
      XLOGF(WARNING,
            "req takes too long, total {}, server {}, network {}, queue {}\ndetails {}",
            timestamp->totalLatency(),
            timestamp->serverLatency(),
            timestamp->networkLatency(),
            timestamp->queueLatency(),
            serde::toJsonString(*timestamp));
    }
    if (timestamp && options.reportMetrics) {
      static const auto methodName = fmt::format("{}::{}",
                                                 static_cast<std::string_view>(kServiceName),
                                                 static_cast<std::string_view>(kMethodName));
      static const auto tags = monitor::TagSet::create("instance", methodName);
      reportMetrics(tags, timestamp, requestLength, item.buf->length());
    }
    co_return rsp;
  }