in cpp/acs_agent_client.cc [206:285]
absl::Status AcsAgentClient::AddRequestAndWaitForResponse(
const Request& request) {
// Queue up the message in the reactor and create the promise-future pair to
// wait for the response from the server.
bool added_to_reactor = false;
const std::string& message_id = request.message_id();
std::promise<absl::Status> responsePromise;
std::future<absl::Status> responseFuture = responsePromise.get_future();
{
absl::MutexLock lock(&request_delivery_status_mtx_);
attempted_requests_responses_sub_.emplace(message_id,
std::move(responsePromise));
}
// TODO: Make the retry parameters configurable.
for (int i = 0; i < 5; ++i) {
{
absl::MutexLock lock(&reactor_mtx_);
if (request.has_register_connection() &&
(stream_state_ != ClientState::kStreamNotInitialized &&
stream_state_ != ClientState::kStreamTemporarilyDown)) {
return absl::InternalError(
"The stream is not in the correct state to accept new registration "
"request.");
}
if (request.has_message_body() && stream_state_ != ClientState::kReady) {
return absl::FailedPreconditionError(
"The stream is not ready to accept new MessageBody.");
}
if (reactor_->AddRequest(request)) {
added_to_reactor = true;
break;
}
}
int delayMillis = std::min(100 * (1 << i), 2000);
absl::SleepFor(absl::Milliseconds(delayMillis));
}
if (!added_to_reactor) {
// Set a dummy status value and clean up the promise if we fail to add the
// request to the reactor.
absl::MutexLock lock(&request_delivery_status_mtx_);
SetValueAndRemovePromise(message_id, absl::OkStatus());
ABSL_VLOG(1) << absl::StrFormat(
"Failed to add message with id: %s to reactor as the ongoing write "
"takes too long.",
message_id);
return absl::AlreadyExistsError(
"Failed to add message to reactor because the ongoing write takes too "
"long.");
}
// Now that we have added the request to the reactor, wait for the response
// from the server.
std::future_status status = responseFuture.wait_for(max_wait_time_for_ack_);
absl::Status received_status = absl::OkStatus();
if (status == std::future_status::ready) {
received_status = responseFuture.get();
return received_status;
}
if (status == std::future_status::timeout) {
ABSL_VLOG(1) << "timeout of waiting for response: " << message_id;
received_status = absl::DeadlineExceededError(absl::StrFormat(
"Timeout waiting for promise to be set for message with id: %s.",
message_id));
}
if (status == std::future_status::deferred) {
ABSL_LOG(WARNING)
<< "This should never happen: get a deferred status from the future "
"when waiting for response for message with id: "
<< message_id;
received_status = absl::InternalError(absl::StrFormat(
"Future is deferred for message with id: %s. This should never happen.",
message_id));
}
// Set a dummy status value and clean up the promise if we don't receive the
// response from the server.
absl::MutexLock lock(&request_delivery_status_mtx_);
SetValueAndRemovePromise(message_id, absl::OkStatus());
return received_status;
}