in cpp/acs_agent_client.cc [347:405]
absl::Status AcsAgentClient::RegisterConnection(const Request& request) {
// Add request message to the reactor and create the promise-future pair to
// wait for the response from the server.
const std::string& message_id = request.message_id();
std::promise<absl::Status> responsePromise;
std::future<absl::Status> responseFuture = responsePromise.get_future();
{
absl::MutexLock lock(&request_delivery_status_mtx_);
attempted_requests_responses_sub_.emplace(message_id,
std::move(responsePromise));
}
bool added_to_reactor = reactor_->AddRequest(request);
if (!added_to_reactor) {
absl::MutexLock lock(&request_delivery_status_mtx_);
SetValueAndRemovePromise(message_id, absl::OkStatus());
return absl::InternalError(
"Failed to add registration request to reactor, because the existing "
"write buffer is full. This should never happen, because the "
"registration request should be the first request sent to the server.");
}
// Now that we have added the request to the reactor, wait for the response
// from the server.
// Note that during the wait here, we hold the reactor_mtx_ lock. This is
// intentional to keep the client from sending any other requests. The
// downside is that if reactor calls OnReadDone(ok=false), which indicates the
// failure of register connection, ReactorReadCallback() will not be able to
// acquire the reactor_mtx_ lock until the wait here is done. This is fine
// for now, because this function will still return a failed status, and wait
// for the caller of this class to retry.
std::future_status status = responseFuture.wait_for(max_wait_time_for_ack_);
absl::Status received_status = absl::OkStatus();
if (status == std::future_status::ready) {
received_status = responseFuture.get();
return received_status;
}
if (status == std::future_status::timeout) {
ABSL_VLOG(1) << "timeout of waiting for response: " << message_id;
received_status = absl::DeadlineExceededError(absl::StrFormat(
"Timeout waiting for promise to be set for message with id: %s.",
message_id));
}
if (status == std::future_status::deferred) {
ABSL_LOG(WARNING)
<< "This should never happen: get a deferred status from the future "
"when waiting for response for message with id: "
<< message_id;
received_status = absl::InternalError(absl::StrFormat(
"Future is deferred for message with id: %s. This should never happen.",
message_id));
}
// Set a dummy status value and clean up the promise if we don't receive the
// response from the server.
absl::MutexLock lock(&request_delivery_status_mtx_);
SetValueAndRemovePromise(message_id, absl::OkStatus());
return received_status;
}