cpp/fake_acs_agent_server_reactor.h (102 lines of code) (raw):
#include <chrono>
#include <functional>
#include <map>
#include <memory>
#include <queue>
#include <string>
#include <utility>
#include "proto/agent_communication.grpc.pb.h"
#include "absl/base/thread_annotations.h"
#include "absl/functional/any_invocable.h"
#include "absl/log/absl_log.h"
#include "absl/synchronization/mutex.h"
#include "grpcpp/impl/service_type.h"
#include "grpcpp/security/server_credentials.h"
#include "grpcpp/server_builder.h"
#include "grpcpp/server_context.h"
#include "grpcpp/support/server_callback.h"
#include "grpcpp/support/status.h"
namespace agent_communication {
// Fake ACS agent server reactor.
// This class is used to create a fake ACS agent server for testing the
// functionality of the client reactor.
class FakeAcsAgentServerReactor
: public grpc::ServerBidiReactor<
google::cloud::agentcommunication::v1::StreamAgentMessagesRequest,
google::cloud::agentcommunication::v1::StreamAgentMessagesResponse> {
public:
explicit FakeAcsAgentServerReactor(
std::function<void(
google::cloud::agentcommunication::v1::StreamAgentMessagesRequest)>
read_callback)
: read_callback_(read_callback) {
StartRead(&request_);
}
// Adds a response to the buffer of the server reactor and triggers a write
// operation if there is no write operation in flight.
void AddResponse(
std::unique_ptr<
google::cloud::agentcommunication::v1::StreamAgentMessagesResponse>
response) ABSL_LOCKS_EXCLUDED(response_mtx_);
private:
// Override methods of ServerBidiReactor. These methods are called by gRPC
// framework. OnDone is called when the RPC is terminated. OnCancel is called
// when the RPC is cancelled by the client. OnReadDone is called when the read
// operation is done. OnWriteDone is called when the write operation is done.
void OnDone() override;
void OnCancel() override { ABSL_LOG(INFO) << "server reactor is cancelled"; }
void OnReadDone(bool ok) ABSL_LOCKS_EXCLUDED(response_mtx_) override;
void OnWriteDone(bool ok) ABSL_LOCKS_EXCLUDED(response_mtx_) override;
// Triggers a StartWrite operation if there is no write operation in flight
// and there is a non-empty response in the buffer.
void NextWrite() ABSL_EXCLUSIVE_LOCKS_REQUIRED(response_mtx_);
// Callback invoked in OnReadDone to process the request from the client.
// Injected during construction to allow the caller of this class to
// control how to process the messages from the client.
std::function<void(
google::cloud::agentcommunication::v1::StreamAgentMessagesRequest)>
read_callback_;
absl::Mutex response_mtx_;
// Whether there is a write operation in flight.
bool writing_ ABSL_GUARDED_BY(response_mtx_) = false;
// Buffer to store the responses to be sent to the client.
std::queue<std::unique_ptr<
google::cloud::agentcommunication::v1::StreamAgentMessagesResponse> >
responses_ ABSL_GUARDED_BY(response_mtx_);
// Buffer to store the request received from the client.
google::cloud::agentcommunication::v1::StreamAgentMessagesRequest request_;
};
// Fake ACS agent service implementation.
// This class is used to create a fake ACS agent service for testing the
// functionality of the client reactor.
class FakeAcsAgentServiceImpl final
: public google::cloud::agentcommunication::v1::AgentCommunication::
CallbackService {
public:
explicit FakeAcsAgentServiceImpl(
std::function<void(
google::cloud::agentcommunication::v1::StreamAgentMessagesRequest)>
read_callback)
: read_callback_(read_callback) {}
explicit FakeAcsAgentServiceImpl(
std::function<void(
google::cloud::agentcommunication::v1::StreamAgentMessagesRequest)>
read_callback,
std::multimap<std::string, std::string> initial_metadata)
: read_callback_(read_callback), initial_metadata_(initial_metadata) {}
grpc::ServerBidiReactor<
google::cloud::agentcommunication::v1::StreamAgentMessagesRequest,
google::cloud::agentcommunication::v1::StreamAgentMessagesResponse>*
StreamAgentMessages(grpc::CallbackServerContext* context)
ABSL_LOCKS_EXCLUDED(reactor_mtx_) override;
// Adds initial metadata to the server context. The metadata needs to be
// called after the reactor is created, before the reactor writes any message.
void AddInitialMetadata(const std::string& key, const std::string& value);
// Adds a response to the buffer of the server reactor.
void AddResponse(
std::unique_ptr<
google::cloud::agentcommunication::v1::StreamAgentMessagesResponse>
response) ABSL_LOCKS_EXCLUDED(reactor_mtx_);
bool IsReactorCreated() ABSL_LOCKS_EXCLUDED(reactor_mtx_);
private:
absl::Mutex reactor_mtx_;
grpc::CallbackServerContext* context_ ABSL_GUARDED_BY(reactor_mtx_);
FakeAcsAgentServerReactor* reactor_ ABSL_GUARDED_BY(reactor_mtx_) = nullptr;
std::function<void(
google::cloud::agentcommunication::v1::StreamAgentMessagesRequest)>
read_callback_;
std::multimap<std::string, std::string> initial_metadata_
ABSL_GUARDED_BY(reactor_mtx_);
};
// Fake ACS agent server.
// This class is used to create a fake ACS agent server for testing the
// functionality of the client reactor.
class FakeAcsAgentServer {
public:
explicit FakeAcsAgentServer(grpc::Service* service);
void Shutdown(std::chrono::system_clock::time_point deadline)
ABSL_LOCKS_EXCLUDED(server_mtx_);
void Wait() ABSL_LOCKS_EXCLUDED(server_mtx_);
std::string GetServerAddress() ABSL_LOCKS_EXCLUDED(server_mtx_);
private:
absl::Mutex server_mtx_;
std::unique_ptr<grpc::Server> server_ ABSL_GUARDED_BY(server_mtx_);
std::string server_address_ ABSL_GUARDED_BY(server_mtx_);
std::shared_ptr<grpc::ServerCredentials> server_credentials_
ABSL_GUARDED_BY(server_mtx_);
};
} // namespace agent_communication