absl::Status AcsAgentClient::AddRequestAndWaitForResponse()

in cpp/acs_agent_client.cc [206:285]


absl::Status AcsAgentClient::AddRequestAndWaitForResponse(
    const Request& request) {
  // Queue up the message in the reactor and create the promise-future pair to
  // wait for the response from the server.
  bool added_to_reactor = false;
  const std::string& message_id = request.message_id();
  std::promise<absl::Status> responsePromise;
  std::future<absl::Status> responseFuture = responsePromise.get_future();
  {
    absl::MutexLock lock(&request_delivery_status_mtx_);
    attempted_requests_responses_sub_.emplace(message_id,
                                              std::move(responsePromise));
  }
  // TODO: Make the retry parameters configurable.
  for (int i = 0; i < 5; ++i) {
    {
      absl::MutexLock lock(&reactor_mtx_);
      if (request.has_register_connection() &&
          (stream_state_ != ClientState::kStreamNotInitialized &&
           stream_state_ != ClientState::kStreamTemporarilyDown)) {
        return absl::InternalError(
            "The stream is not in the correct state to accept new registration "
            "request.");
      }
      if (request.has_message_body() && stream_state_ != ClientState::kReady) {
        return absl::FailedPreconditionError(
            "The stream is not ready to accept new MessageBody.");
      }
      if (reactor_->AddRequest(request)) {
        added_to_reactor = true;
        break;
      }
    }
    int delayMillis = std::min(100 * (1 << i), 2000);
    absl::SleepFor(absl::Milliseconds(delayMillis));
  }
  if (!added_to_reactor) {
    // Set a dummy status value and clean up the promise if we fail to add the
    // request to the reactor.
    absl::MutexLock lock(&request_delivery_status_mtx_);
    SetValueAndRemovePromise(message_id, absl::OkStatus());
    ABSL_VLOG(1) << absl::StrFormat(
        "Failed to add message with id: %s to reactor as the ongoing write "
        "takes too long.",
        message_id);
    return absl::AlreadyExistsError(
        "Failed to add message to reactor because the ongoing write takes too "
        "long.");
  }

  // Now that we have added the request to the reactor, wait for the response
  // from the server.
  std::future_status status = responseFuture.wait_for(max_wait_time_for_ack_);
  absl::Status received_status = absl::OkStatus();
  if (status == std::future_status::ready) {
    received_status = responseFuture.get();
    return received_status;
  }

  if (status == std::future_status::timeout) {
    ABSL_VLOG(1) << "timeout of waiting for response: " << message_id;
    received_status = absl::DeadlineExceededError(absl::StrFormat(
        "Timeout waiting for promise to be set for message with id: %s.",
        message_id));
  }
  if (status == std::future_status::deferred) {
    ABSL_LOG(WARNING)
        << "This should never happen: get a deferred status from the future "
           "when waiting for response for message with id: "
        << message_id;
    received_status = absl::InternalError(absl::StrFormat(
        "Future is deferred for message with id: %s. This should never happen.",
        message_id));
  }
  // Set a dummy status value and clean up the promise if we don't receive the
  // response from the server.
  absl::MutexLock lock(&request_delivery_status_mtx_);
  SetValueAndRemovePromise(message_id, absl::OkStatus());
  return received_status;
}