cpp/acs_agent_client.h (180 lines of code) (raw):

#ifndef THIRD_PARTY_AGENTCOMMUNICATION_CLIENT_CPP_ACS_AGENT_CLIENT_H_ #define THIRD_PARTY_AGENTCOMMUNICATION_CLIENT_CPP_ACS_AGENT_CLIENT_H_ #include <chrono> #include <cstdint> #include <future> #include <memory> #include <queue> #include <string> #include <thread> #include <unordered_map> #include <utility> #include "proto/agent_communication.grpc.pb.h" #include "absl/base/thread_annotations.h" #include "absl/functional/any_invocable.h" #include "absl/random/random.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/synchronization/mutex.h" #include "cpp/acs_agent_client_reactor.h" #include "cpp/acs_agent_helper.h" namespace agent_communication { // AcsAgentClient is a client library for Agent Communication Service (ACS). // This class is thread-safe. // This class provides a public API to send and receive messages from the ACS // server. It uses a gRPC client reactor to handle the communication with the // server. After the creation and initialization of this class, to send a // message to the server, the caller can send messages to the server by calling // SendMessage() or AddRequest(). On read, the caller can register a callback to // process the response from the server during the creation of this class. class AcsAgentClient { public: constexpr static std::chrono::milliseconds kDefaultMaxWaitTimeForAck = std::chrono::milliseconds(2'000); // Factory method to create a client. User only needs to supply the channel id // and the boolean to indicate if the endpoint is regional, without needing to // supply the whole connection id. static absl::StatusOr<std::unique_ptr<AcsAgentClient>> Create( bool endpoint_regional, std::string channel_id, absl::AnyInvocable<void( google::cloud::agentcommunication::v1::StreamAgentMessagesResponse)> read_callback, std::chrono::milliseconds max_wait_time_for_ack = kDefaultMaxWaitTimeForAck); // Factory method to create a client. static absl::StatusOr<std::unique_ptr<AcsAgentClient>> Create( std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub> stub, AgentConnectionId agent_connection_id, absl::AnyInvocable<void( google::cloud::agentcommunication::v1::StreamAgentMessagesResponse)> read_callback, absl::AnyInvocable<std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub>()> stub_generator, absl::AnyInvocable<absl::StatusOr<AgentConnectionId>()> connection_id_generator, std::chrono::milliseconds max_wait_time_for_ack = kDefaultMaxWaitTimeForAck); // Sends a StreamAgentMessagesRequest to the server. // It will automatically retry if the request was not acknowledged by the // server or the server returns a ResourceExhausted error. // We pass by non-const reference to allow the update of the message_id on // every retry, without the need to create a new StreamAgentMessagesRequest // object. Returns status of the send request. absl::Status AddRequest( google::cloud::agentcommunication::v1::StreamAgentMessagesRequest& request) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_) ABSL_LOCKS_EXCLUDED(reactor_mtx_); // Sends a MessageBody to the server. The input parameter // message_body will be moved to create a StreamAgentMessagesRequest and then // call AddRequest(). // Returns status of AddRequest(). absl::Status SendMessage( google::cloud::agentcommunication::v1::MessageBody message_body) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_) ABSL_LOCKS_EXCLUDED(reactor_mtx_); // Returns the quota of (# of messages per minute & # of bytes per minute) for // the agent to send messages to the server. Returns error status if not set // by server. User of the agent can use its own throttling mechanism to ensure // the health of the agent and properly handle the resource exhausted error. absl::StatusOr<uint64_t> GetMessagePerMinuteQuota() ABSL_LOCKS_EXCLUDED(reactor_mtx_); absl::StatusOr<uint64_t> GetBytesPerMinuteQuota() ABSL_LOCKS_EXCLUDED(reactor_mtx_); // Checks if the client is dead. If the caller of this class has failure of // sending or receiving messages, it can call this function to check if the // client is dead and needs a restart. If the client is dead, the caller can // call Shutdown() or directly invoke destructor to clean up the client. If // the client is not dead, the caller can retry sending messages later with a // self-determined backoff mechanism. bool IsDead() ABSL_LOCKS_EXCLUDED(reactor_mtx_); // Shuts down the client by joining the restart client thread and the // read_response_thread_, and then cancel the RPC. void Shutdown() ABSL_LOCKS_EXCLUDED(reactor_mtx_) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_); ~AcsAgentClient() { Shutdown(); } private: AcsAgentClient( AgentConnectionId connection_id, absl::AnyInvocable<void( google::cloud::agentcommunication::v1::StreamAgentMessagesResponse)> read_callback, absl::AnyInvocable<std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub>()> stub_generator, absl::AnyInvocable<absl::StatusOr<AgentConnectionId>()> connection_id_generator, std::chrono::milliseconds max_wait_time_for_ack) : connection_id_(std::move(connection_id)), stub_generator_(std::move(stub_generator)), connection_id_generator_(std::move(connection_id_generator)), read_callback_(std::move(read_callback)), max_wait_time_for_ack_(max_wait_time_for_ack) {} // Initializes the client by registering the connection. absl::Status Init() ABSL_EXCLUSIVE_LOCKS_REQUIRED(reactor_mtx_) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_); // Adds a request to the buffer of the reactor and waits for the response from // the server. // After the message has been queued in the reactor, it will wait for the // response from the server by creating a promise and a future associated with // the message id. The promise will be set by the AckOnSuccessfulDelivery() // function when the response from the server is received. // This function will retry if the reactor's existing buffer is full. // Returns: status of the send request. absl::Status AddRequestAndWaitForResponse( const google::cloud::agentcommunication::v1::StreamAgentMessagesRequest& request) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_) ABSL_LOCKS_EXCLUDED(reactor_mtx_); // Registers the connection with the server. Note this function holds the // reactor_mtx_ lock entire time intentionally to keep the client from // sending any other requests. absl::Status RegisterConnection( const google::cloud::agentcommunication::v1::StreamAgentMessagesRequest& request) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_) ABSL_EXCLUSIVE_LOCKS_REQUIRED(reactor_mtx_); // Processes Responses from the server. Acknowledges the response from server // and then calls the read_callback_ to process the message. This function // will be executed in the read_response_thread_. void ClientReadMessage() ABSL_LOCKS_EXCLUDED(response_read_mtx_) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_); // Checks if the ClientReadMessage() should be woken up. bool ShouldWakeUpClientReadMessage() ABSL_EXCLUSIVE_LOCKS_REQUIRED(response_read_mtx_); // Callback invoked in OnReadDone of reactor to process the Response. // If OnReadDone(true), wakes up the read_response_thread_ and passes the // received Response to be handled by the ClientReadMessage(). In this way, // the reactor's OnReadDone call will return fast, and the actual processing // of the Response is delegated to the read_response_thread_. // If OnReadDone(false), the RPC is terminated, wakes up the // restart_client_thread_ to re-initialize the client. void ReactorReadCallback( google::cloud::agentcommunication::v1::StreamAgentMessagesResponse response, AcsAgentClientReactor::RpcStatus status) ABSL_LOCKS_EXCLUDED(response_read_mtx_) ABSL_LOCKS_EXCLUDED(reactor_mtx_); // Acknowledges the Response from the server for a successful delivery of a // Request. ACS server normally sends a MessageResponse as an acknowledgement // for each received Request that has a MessageBody or RegisterConnection. // Within AddRequestAndWaitForResponse(), the client creates a promise and // a future associated with the message id. The promise will be set by this // AckOnSuccessfulDelivery() function when the MessageResponse from the server // associated with the message id is received. void AckOnSuccessfulDelivery( const google::cloud::agentcommunication::v1::StreamAgentMessagesResponse& response) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_); // Restarts the client when the RPC is terminated. This function will be // executed in the restart_client_thread_. // Listens to the ReactorReadCallback(). When OnReadDone(false), this function // will be waken up to restart the client, i.e., create a new stub and // connection id, re-create the stream, and register the connection. void RestartClient() ABSL_LOCKS_EXCLUDED(reactor_mtx_) ABSL_LOCKS_EXCLUDED(request_delivery_status_mtx_); // Generates a new stub and connection id and returns the stub, called within // the restart_client_thread_. std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub> GenerateConnectionIdAndStub() ABSL_EXCLUSIVE_LOCKS_REQUIRED(reactor_mtx_); // Creates a unique message id. Currently, it is "{random // number}-{current_timestamp}". The requirement of the uniqueness from ACS // server is not very strict: We just need to make sure the message id is // unique within a couple of seconds for each agent. std::string CreateMessageUuid() ABSL_EXCLUSIVE_LOCKS_REQUIRED(reactor_mtx_); // Set the value of the promise and remove the promise from the map. This // function is used to ensure: // 1. If the promise is not set due to any reason but we want to remove it, we // will set the value and then safely destroy the promise. // 2. Whenever we set the value of the promise, we safely destroy the promise // so that it never gets set the value again. void SetValueAndRemovePromise(const std::string& message_id, absl::Status status) ABSL_EXCLUSIVE_LOCKS_REQUIRED(request_delivery_status_mtx_); absl::BitGen gen_ ABSL_GUARDED_BY(reactor_mtx_); // Dedicated to reading the Response from the server. std::thread read_response_thread_; AgentConnectionId connection_id_ ABSL_GUARDED_BY(reactor_mtx_); // Mutex to protect the reactor_ and other variables that are only accessed // by the reactor_ for write operations. absl::Mutex reactor_mtx_ ABSL_ACQUIRED_BEFORE(request_delivery_status_mtx_); // Generates a new stub when the client needs to restart the client. We have a // default implementation, this function pointer allows us to pass in a custom // implementation for testing purposes and other use cases. absl::AnyInvocable<std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub>()> stub_generator_ = nullptr; // Generates a new connection id when the client needs to restart the client. // We have a default implementation, this function pointer allows us to pass // in a custom implementation for testing purposes and other use cases. absl::AnyInvocable<absl::StatusOr<AgentConnectionId>()> connection_id_generator_ = nullptr; // Reactor to handle the gRPC communication with the server. std::unique_ptr<AcsAgentClientReactor> reactor_ ABSL_GUARDED_BY(reactor_mtx_); // Callback injected by the client to process the messages from the server. absl::AnyInvocable<void( google::cloud::agentcommunication::v1::StreamAgentMessagesResponse)> read_callback_; // Mutex to protect the attempted_requests_responses_sub_. absl::Mutex request_delivery_status_mtx_; // Key: message id of each attempted request. Value: promise to be set when // the response from the server is received in AckOnSuccessfulDelivery(). std::unordered_map<std::string, std::promise<absl::Status>> ABSL_GUARDED_BY( request_delivery_status_mtx_) attempted_requests_responses_sub_; // Mutex that serves as a channel to pass the message from OnReadDone of // reactor to the ClientReadMessage(). absl::Mutex response_read_mtx_; // State of the client. enum class ClientState { // The client is ready to read any Response from the server. kReady, // Stream not initialized, waiting for the first registration request. kStreamNotInitialized, // The RPC is temporarily down, waiting for a restart. kStreamTemporarilyDown, // The RPC failed to be initialized. kStreamFailedToInitialize, // The client is being shutdown. kShutdown, }; // State of the client read thread. ClientState client_read_state_ ABSL_GUARDED_BY(response_read_mtx_) = ClientState::kReady; // State of the stream/RPC. ClientState stream_state_ ABSL_GUARDED_BY(reactor_mtx_) = ClientState::kStreamNotInitialized; std::thread restart_client_thread_; // Buffer to store the Response read from OnReadDone of reactor. std::queue<google::cloud::agentcommunication::v1::StreamAgentMessagesResponse> msg_responses_ ABSL_GUARDED_BY(response_read_mtx_); // Maximum time to wait for the acknowledgement from server. const std::chrono::milliseconds max_wait_time_for_ack_; }; } // namespace agent_communication #endif // THIRD_PARTY_AGENTCOMMUNICATION_CLIENT_CPP_ACS_AGENT_CLIENT_H_