e2e-examples/gcs/dummy_server/main.cc (163 lines of code) (raw):

// Copyright 2025 gRPC authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #include <grpcpp/ext/proto_server_reflection_plugin.h> #include <grpcpp/grpcpp.h> #include <grpcpp/health_check_service_interface.h> #include <algorithm> #include <fstream> #include <iostream> #include <memory> #include <string> #include "absl/flags/flag.h" #include "absl/flags/parse.h" #include "absl/strings/str_format.h" #include "e2e-examples/gcs/dummy_server/gcs_util.h" #include "google/storage/v2/storage.grpc.pb.h" using grpc::CallbackServerContext; using grpc::Server; using grpc::ServerBuilder; using grpc::ServerUnaryReactor; using grpc::Status; using grpc::StatusCode; using ::google::storage::v2::GetObjectRequest; using ::google::storage::v2::Object; using ::google::storage::v2::ReadObjectRequest; using ::google::storage::v2::ReadObjectResponse; using ::google::storage::v2::StartResumableWriteRequest; using ::google::storage::v2::StartResumableWriteResponse; using ::google::storage::v2::WriteObjectRequest; using ::google::storage::v2::WriteObjectResponse; ABSL_FLAG(uint16_t, port, 50051, "Server port for the service"); ABSL_FLAG(std::string, cred, "insecure", "Credential type (insecure,ssl,alts)"); ABSL_FLAG(std::string, ssl_key, "", "Path to the server private key file"); ABSL_FLAG(std::string, ssl_cert, "", "Path to the server SSL certification chain file"); // Logic and data behind the server's behavior. class StorageServiceImpl final : public google::storage::v2::Storage::CallbackService { ServerUnaryReactor* GetObject(CallbackServerContext* context, const GetObjectRequest* request, Object* reply) override { Status status; const int64_t object_size = GcsUtil::GetObjectSize(request->bucket(), request->object()); if (object_size < 0) { status = Status(StatusCode::NOT_FOUND, "Object is not found"); } else { status = Status::OK; reply->set_size(object_size); } ServerUnaryReactor* reactor = context->DefaultReactor(); reactor->Finish(status); return reactor; } grpc::ServerWriteReactor<ReadObjectResponse>* ReadObject( CallbackServerContext* context, const ReadObjectRequest* request) override { class Reactor : public grpc::ServerWriteReactor<ReadObjectResponse> { public: Reactor(const ReadObjectRequest* request) { const int64_t object_size = GcsUtil::GetObjectSize(request->bucket(), request->object()); if (object_size < 0) { Finish(Status(StatusCode::NOT_FOUND, "Object is not found")); return; } left_size_ = object_size; if (request->read_limit() > 0) { left_size_ = std::min(left_size_, request->read_limit()); } chunk_data_ = absl::Cord(GcsUtil::GetObjectDataChunk( google::storage::v2::ServiceConstants::MAX_READ_CHUNK_BYTES)); MaybeWriteNext(); } void OnWriteDone(bool ok) override { if (ok) { MaybeWriteNext(); } else { Finish(grpc::Status(grpc::StatusCode::UNKNOWN, "Unexpected failure")); } } void OnDone() override { delete this; } private: void MaybeWriteNext() { if (left_size_ == 0) { Finish(grpc::Status::OK); return; } auto data_size = std::min(left_size_, static_cast<int64_t>(chunk_data_.size())); response_.mutable_checksummed_data()->set_content( chunk_data_.Subcord(0, data_size)); left_size_ -= data_size; StartWrite(&response_); } private: int64_t left_size_; absl::Cord chunk_data_; ReadObjectResponse response_; }; return new Reactor(request); } grpc::ServerReadReactor<WriteObjectRequest>* WriteObject( CallbackServerContext* context, WriteObjectResponse* response) override { class Reactor : public grpc::ServerReadReactor<WriteObjectRequest> { public: explicit Reactor(WriteObjectResponse* response) { StartRead(&request_); } void OnReadDone(bool ok) override { if (!ok) { Finish(grpc::Status::OK); return; } StartRead(&request_); } void OnDone() override { delete this; } private: WriteObjectRequest request_; WriteObjectResponse* response_ = nullptr; }; return new Reactor(response); } }; static std::string LoadStringFromFile(std::string path) { std::ifstream file(path); if (!file.is_open()) { std::cout << "Failed to open " << path << std::endl; abort(); } std::stringstream sstr; sstr << file.rdbuf(); return sstr.str(); } void RunServer(uint16_t port) { std::string server_address = absl::StrFormat("0.0.0.0:%d", port); grpc::EnableDefaultHealthCheckService(true); grpc::reflection::InitProtoReflectionServerBuilderPlugin(); ServerBuilder builder; const std::string cred = absl::GetFlag(FLAGS_cred); if (cred == "insecure") { builder.AddListeningPort(server_address, grpc::InsecureServerCredentials()); } else if (cred == "ssl") { grpc::SslServerCredentialsOptions::PemKeyCertPair key_cert_pair = { LoadStringFromFile(absl::GetFlag(FLAGS_ssl_key)), LoadStringFromFile(absl::GetFlag(FLAGS_ssl_cert))}; grpc::SslServerCredentialsOptions ssl_options; ssl_options.pem_key_cert_pairs.emplace_back(key_cert_pair); builder.AddListeningPort(server_address, grpc::SslServerCredentials(ssl_options)); } else if (cred == "alts") { grpc::experimental::AltsServerCredentialsOptions alts_opts; builder.AddListeningPort( server_address, grpc::experimental::AltsServerCredentials(alts_opts)); } else { std::cout << "Unknown cred type: " << cred << std::endl; exit(1); } StorageServiceImpl service; builder.RegisterService(&service); std::unique_ptr<Server> server(builder.BuildAndStart()); std::cout << "Server listening on " << server_address << std::endl; server->Wait(); } int main(int argc, char** argv) { absl::ParseCommandLine(argc, argv); RunServer(absl::GetFlag(FLAGS_port)); return 0; }