in cpp/acs_agent_client.cc [407:458]
void AcsAgentClient::RestartClient() {
while (true) {
reactor_mtx_.LockWhen(absl::Condition(
+[](ClientState* stream_state) {
return *stream_state == ClientState::kStreamTemporarilyDown ||
*stream_state == ClientState::kShutdown;
},
&stream_state_));
// Terminate the thread if the client is being shutdown.
if (stream_state_ == ClientState::kShutdown) {
reactor_mtx_.Unlock();
return;
}
// Wait for the reactor to be terminated, capture the status, and then
// restart the reactor.
// TODO: need to determine if we want to retry based on the status, and add
// retry logic with backoff mechanism.
if (reactor_ != nullptr) {
grpc::Status status = reactor_->Await();
ABSL_VLOG(1) << absl::StrFormat(
"RestartReactor thread trying to restart the stream with previous "
"termination status code: %d and message: %s and details: %s",
status.error_code(), status.error_message(), status.error_details());
}
std::unique_ptr<AcsStub> stub = GenerateConnectionIdAndStub();
if (stub == nullptr) {
stream_state_ = ClientState::kStreamFailedToInitialize;
reactor_mtx_.Unlock();
return;
}
reactor_ = std::make_unique<AcsAgentClientReactor>(
std::move(stub),
absl::bind_front(&AcsAgentClient::ReactorReadCallback, this),
connection_id_);
if (reactor_ == nullptr) {
stream_state_ = ClientState::kStreamFailedToInitialize;
reactor_mtx_.Unlock();
ABSL_LOG(WARNING) << "Failed to generate connection id and reactor.";
return;
}
// Initialize the client.
absl::Status init_status = Init();
if (!init_status.ok()) {
stream_state_ = ClientState::kStreamFailedToInitialize;
reactor_mtx_.Unlock();
return;
}
reactor_mtx_.Unlock();
}
}