absl::Status AcsAgentClient::RegisterConnection()

in cpp/acs_agent_client.cc [347:405]


absl::Status AcsAgentClient::RegisterConnection(const Request& request) {
  // Add request message to the reactor and create the promise-future pair to
  // wait for the response from the server.
  const std::string& message_id = request.message_id();
  std::promise<absl::Status> responsePromise;
  std::future<absl::Status> responseFuture = responsePromise.get_future();
  {
    absl::MutexLock lock(&request_delivery_status_mtx_);
    attempted_requests_responses_sub_.emplace(message_id,
                                              std::move(responsePromise));
  }
  bool added_to_reactor = reactor_->AddRequest(request);
  if (!added_to_reactor) {
    absl::MutexLock lock(&request_delivery_status_mtx_);
    SetValueAndRemovePromise(message_id, absl::OkStatus());
    return absl::InternalError(
        "Failed to add registration request to reactor, because the existing "
        "write buffer is full. This should never happen, because the "
        "registration request should be the first request sent to the server.");
  }

  // Now that we have added the request to the reactor, wait for the response
  // from the server.
  // Note that during the wait here, we hold the reactor_mtx_ lock. This is
  // intentional to keep the client from sending any other requests. The
  // downside is that if reactor calls OnReadDone(ok=false), which indicates the
  // failure of register connection, ReactorReadCallback() will not be able to
  // acquire the reactor_mtx_ lock until the wait here is done. This is fine
  // for now, because this function will still return a failed status, and wait
  // for the caller of this class to retry.
  std::future_status status = responseFuture.wait_for(max_wait_time_for_ack_);
  absl::Status received_status = absl::OkStatus();
  if (status == std::future_status::ready) {
    received_status = responseFuture.get();
    return received_status;
  }

  if (status == std::future_status::timeout) {
    ABSL_VLOG(1) << "timeout of waiting for response: " << message_id;
    received_status = absl::DeadlineExceededError(absl::StrFormat(
        "Timeout waiting for promise to be set for message with id: %s.",
        message_id));
  }
  if (status == std::future_status::deferred) {
    ABSL_LOG(WARNING)
        << "This should never happen: get a deferred status from the future "
           "when waiting for response for message with id: "
        << message_id;
    received_status = absl::InternalError(absl::StrFormat(
        "Future is deferred for message with id: %s. This should never happen.",
        message_id));
  }

  // Set a dummy status value and clean up the promise if we don't receive the
  // response from the server.
  absl::MutexLock lock(&request_delivery_status_mtx_);
  SetValueAndRemovePromise(message_id, absl::OkStatus());
  return received_status;
}