cpp/acs_agent_client_reactor.h (107 lines of code) (raw):

#ifndef THIRD_PARTY_AGENTCOMMUNICATION_CLIENT_CPP_ACS_AGENT_CLIENT_REACTOR_H_ #define THIRD_PARTY_AGENTCOMMUNICATION_CLIENT_CPP_ACS_AGENT_CLIENT_REACTOR_H_ #include <cstdint> #include <map> #include <memory> #include <queue> #include <string> #include "proto/agent_communication.grpc.pb.h" #include "absl/base/thread_annotations.h" #include "absl/functional/any_invocable.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/synchronization/mutex.h" #include "cpp/acs_agent_helper.h" #include "grpcpp/client_context.h" #include "grpcpp/support/client_callback.h" #include "grpcpp/support/status.h" #include "grpcpp/support/string_ref.h" namespace agent_communication { // gRPC callback-based client reactor for the StreamAgentMessages RPC. This // class is thread-safe. // This reactor provides a public API to add requests to the buffer of the // reactor. Only one request message is buffered at a time to allow the caller // to have precise control the flow of the requests (QPS and BPS). On Read, this // reactor will try to add a response to the write queue if the read message has // a MessageBody, and it will call the read_callback_ to process the response. // On write, this reactor will prioritize the write of ack over message. class AcsAgentClientReactor : public grpc::ClientBidiReactor< google::cloud::agentcommunication::v1::StreamAgentMessagesRequest, google::cloud::agentcommunication::v1::StreamAgentMessagesResponse> { public: // Status of the RPC to be passed to the read_callback_ to let the caller know // the status of the RPC. enum class RpcStatus { kRpcOk, // Rpc is normal. kRpcClosedByClient, // The RPC is force cancelled by the client. kRpcClosedByServer, // The RPC is being terminated by the server. }; // Test only constructor to create client locally without connecting through a // channel to a real ACS server. explicit AcsAgentClientReactor( std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub> stub, absl::AnyInvocable<void( google::cloud::agentcommunication::v1::StreamAgentMessagesResponse, RpcStatus)> read_callback); explicit AcsAgentClientReactor( std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub> stub, absl::AnyInvocable<void( google::cloud::agentcommunication::v1::StreamAgentMessagesResponse, RpcStatus)> read_callback, const AgentConnectionId& agent_connection_id); // Creates a stub to connect to the ACS server. static std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub> CreateStub(const std::string& endpoint); // Adds new request to the buffer of reactor. // Return: boolean on whether an addition of request is successful. This will // be false iff there is a write operation in flight. bool AddRequest( const google::cloud::agentcommunication::v1::StreamAgentMessagesRequest& request) ABSL_LOCKS_EXCLUDED(request_mtx_); // Returns the quota of (# of messages per minute & # of bytes per minute) for // the agent to send messages to the server. These values are retrieved from // server's initial metadata. absl::StatusOr<uint64_t> GetMessagesPerMinuteQuota() ABSL_LOCKS_EXCLUDED(status_mtx_); absl::StatusOr<uint64_t> GetBytesPerMinuteQuota() ABSL_LOCKS_EXCLUDED(status_mtx_); // Waits for the RPC termination status. // This function will listen to the OnDone callback and return the status. grpc::Status Await() ABSL_LOCKS_EXCLUDED(status_mtx_); // Cancels the RPC if it is not terminated yet. bool Cancel() ABSL_LOCKS_EXCLUDED(status_mtx_); ~AcsAgentClientReactor(); private: // Override methods of ClientBidiReactor. These methods are called by gRPC // framework. OnWriteDone is called when the write operation is done. // OnReadDone is called when the read operation is done. // OnReadInitialMetadataDone is called when the read initial metadata from // server operation is done. OnDone is called when the RPC is terminated. void OnWriteDone(bool ok) ABSL_LOCKS_EXCLUDED(request_mtx_) override; void OnReadDone(bool ok) ABSL_LOCKS_EXCLUDED(request_mtx_) ABSL_LOCKS_EXCLUDED(status_mtx_) override; void OnReadInitialMetadataDone(bool ok) ABSL_LOCKS_EXCLUDED(status_mtx_) override; void OnDone(const grpc::Status& status) ABSL_LOCKS_EXCLUDED(status_mtx_) override; // Gets the integer value of the key from the initial metadata of server. template <typename T> absl::StatusOr<T> GetIntValueFromInitialMetadata( const std::multimap<grpc::string_ref, grpc::string_ref>& metadata, const std::string& key); // Adds a response to the queue of ack_buffer_. This function will be called // when a message with MessageBody type is received from the server in // OnReadDone(). void Ack(std::string message_id) ABSL_LOCKS_EXCLUDED(request_mtx_); // Triggers a StartWrite operation if there is no write operation in flight. // This function will be called when a new request is added to the buffer of // reactor, a new ack is added to the buffer, or a write operation is // completed. void NextWrite() ABSL_EXCLUSIVE_LOCKS_REQUIRED(request_mtx_); std::unique_ptr< google::cloud::agentcommunication::v1::AgentCommunication::Stub> stub_; grpc::ClientContext context_; // Callback invoked in OnReadDone to process the response. // Injected during construction to allow the caller of this class to // control how to process the messages from the server. absl::AnyInvocable<void( google::cloud::agentcommunication::v1::StreamAgentMessagesResponse, RpcStatus)> read_callback_; // Buffer to store the response in StartRead. google::cloud::agentcommunication::v1::StreamAgentMessagesResponse response_; // Mutex to protect variables related to the termination of the RPC. absl::Mutex status_mtx_; // The final status of the RPC. To be returned by Await() for consumption of // the client. grpc::Status rpc_final_status_ ABSL_GUARDED_BY(status_mtx_); // Whether the RPC has been terminated. This is to indicate to the Await() // function that the RPC has been terminated and it can return the final // status. bool rpc_done_ ABSL_GUARDED_BY(status_mtx_) = false; // Whether the RPC has been cancelled forcefully by Cancel(). // This is to indicate to the read_callback_ that the RPC has been // cancelled by the user/client. bool rpc_cancelled_by_client_ ABSL_GUARDED_BY(status_mtx_) = false; // Quota of (# of messages per minute & # of bytes per minute) for the agent // to send messages to the server. This value is retrieved from server's // initial metadata. absl::StatusOr<uint64_t> messages_per_minute_quota_ ABSL_GUARDED_BY( status_mtx_) = absl::FailedPreconditionError("stream not initialized."); absl::StatusOr<uint64_t> bytes_per_minute_quota_ ABSL_GUARDED_BY( status_mtx_) = absl::FailedPreconditionError("stream not initialized."); // Keys for the quota in the initial metadata of server. inline static constexpr char kMessagesPerMinuteQuotaKey[] = "agent-communication-message-rate-limit"; inline static constexpr char kBytesPerMinuteQuotaKey[] = "agent-communication-bandwidth-limit"; // Mutex to protect all variables related to write operations. absl::Mutex request_mtx_; // Whether there is a write operation in flight. bool writing_ ABSL_GUARDED_BY(request_mtx_) = false; // Raw pointer to the message request to be written to the stream. It can // point to either a RegisterConnection, a MessageResponse, or a MessageBody // request. This is to be passed into StartWrite() for writing to the stream. google::cloud::agentcommunication::v1::StreamAgentMessagesRequest* request_ ABSL_GUARDED_BY(request_mtx_) = nullptr; // Buffer to store the RegisterConnection or MessageBody request added by the // client. std::unique_ptr< google::cloud::agentcommunication::v1::StreamAgentMessagesRequest> msg_request_ ABSL_GUARDED_BY(request_mtx_) = nullptr; // Buffer to store the MessageResponse that is inserted during OnReadDone for // each MessageBody received from the server. std::queue<std::unique_ptr< google::cloud::agentcommunication::v1::StreamAgentMessagesRequest>> ack_buffer_ ABSL_GUARDED_BY(request_mtx_); }; } // namespace agent_communication #endif // THIRD_PARTY_AGENTCOMMUNICATION_CLIENT_CPP_ACS_AGENT_CLIENT_REACTOR_H_