in cpp/acs_agent_client.cc [292:321]
void AcsAgentClient::ClientReadMessage() {
while (true) {
// This thread will be woken up by the ReactorReadCallback() when reactor
// calls OnReadDone() or woken up by Shutdown().
// Within every iteration, if we don't shutdown, we will pop out 1 message,
// exit the critical section, and then process the message by calling
// AckOnSuccessfulDelivery() and read_callback_. In this way, we can release
// the lock response_read_mtx_ and avoid blocking the OnReadDone() call of
// the reactor.
response_read_mtx_.LockWhen(
absl::Condition(this, &AcsAgentClient::ShouldWakeUpClientReadMessage));
if (client_read_state_ == ClientState::kShutdown) {
response_read_mtx_.Unlock();
return;
}
if (msg_responses_.empty()) {
response_read_mtx_.Unlock();
continue;
}
Response response = std::move(msg_responses_.front());
msg_responses_.pop();
response_read_mtx_.Unlock();
// Exit the critical section and process the message.
if (response.has_message_response()) {
AckOnSuccessfulDelivery(response);
}
read_callback_(std::move(response));
}
}