in src/common/serde/ClientContext.h [40:131]
CoTryTask<Rsp> call(const Req &req,
const net::UserRequestOptions *customOptions = nullptr,
Timestamp *timestamp = nullptr) {
auto options = *options_.load(std::memory_order_acquire);
if (customOptions != nullptr) {
options.merge(*customOptions);
}
net::Waiter::Item item;
uint64_t uuid = net::Waiter::instance().bind(item);
Timestamp ts;
if ((options.logLongRunningThreshold != 0_ms || options.reportMetrics) && timestamp == nullptr) {
timestamp = &ts;
}
MessagePacket packet(req);
packet.uuid = uuid;
packet.serviceId = ServiceID;
packet.methodId = MethodID;
packet.flags = EssentialFlags::IsReq;
if (options.compression) {
packet.flags |= EssentialFlags::UseCompress;
}
if (options.enableRDMAControl) {
packet.flags |= EssentialFlags::ControlRDMA;
}
if (timestamp != nullptr) {
packet.timestamp = Timestamp{UtcClock::now().time_since_epoch().count()};
}
auto writeItem = net::WriteItem::createMessage(packet, options);
writeItem->uuid = uuid;
auto requestLength = writeItem->buf->length();
if (LIKELY(std::holds_alternative<net::IOWorker *>(connectionSource_))) {
std::get<net::IOWorker *>(connectionSource_)->sendAsync(destAddr_, net::WriteList(std::move(writeItem)));
} else if (std::holds_alternative<net::Transport *>(connectionSource_)) {
std::get<net::Transport *>(connectionSource_)->send(net::WriteList(std::move(writeItem)));
} else {
co_return MAKE_ERROR_F(StatusCode::kFoundBug,
"Sync client call async method: service id {}, method id {}",
ServiceID,
MethodID);
}
net::Waiter::instance().schedule(uuid, options.timeout);
co_await item.baton;
if (UNLIKELY(!item.status)) {
if (item.status.code() == RPCCode::kTimeout && std::holds_alternative<net::IOWorker *>(connectionSource_)) {
if (item.transport) {
XLOGF(INFO, "req timeout and close transport {}", fmt::ptr(item.transport.get()));
co_await item.transport->closeIB();
} else {
XLOGF(INFO, "req timeout but no transport");
}
std::get<net::IOWorker *>(connectionSource_)->checkConnections(destAddr_, Duration::zero());
}
co_return makeError(std::move(item.status));
}
Result<Rsp> rsp = makeError(StatusCode::kUnknown);
auto deserializeResult = serde::deserialize(rsp, item.packet.payload);
if (UNLIKELY(!deserializeResult)) {
XLOGF(ERR, "deserialize rsp error: {}", deserializeResult.error());
if (item.transport) {
item.transport->invalidate();
}
co_return makeError(std::move(deserializeResult.error()));
}
if (timestamp != nullptr && item.packet.timestamp.has_value()) {
*timestamp = *item.packet.timestamp;
timestamp->clientWaked = UtcClock::now().time_since_epoch().count();
}
if (options.logLongRunningThreshold != 0_ms && timestamp->totalLatency() >= options.logLongRunningThreshold) {
XLOGF(WARNING,
"req takes too long, total {}, server {}, network {}, queue {}\ndetails {}",
timestamp->totalLatency(),
timestamp->serverLatency(),
timestamp->networkLatency(),
timestamp->queueLatency(),
serde::toJsonString(*timestamp));
}
if (timestamp && options.reportMetrics) {
static const auto methodName = fmt::format("{}::{}",
static_cast<std::string_view>(kServiceName),
static_cast<std::string_view>(kMethodName));
static const auto tags = monitor::TagSet::create("instance", methodName);
reportMetrics(tags, timestamp, requestLength, item.buf->length());
}
co_return rsp;
}