frontend/handlers/change_streams.h (64 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #ifndef THIRD_PARTY_CLOUD_SPANNER_EMULATOR_FRONTEND_HANDLERS_CHANGE_STREAMS_H_ #define THIRD_PARTY_CLOUD_SPANNER_EMULATOR_FRONTEND_HANDLERS_CHANGE_STREAMS_H_ #include <memory> #include <string> #include "google/spanner/v1/spanner.pb.h" #include "absl/flags/declare.h" #include "absl/flags/flag.h" #include "absl/status/status.h" #include "absl/status/statusor.h" #include "absl/time/time.h" #include "backend/query/change_stream/change_stream_query_validator.h" #include "backend/query/query_engine.h" #include "frontend/entities/session.h" #include "frontend/server/handler.h" ABSL_DECLARE_FLAG(bool, cloud_spanner_emulator_test_with_fake_partition_table); namespace google { namespace spanner { namespace emulator { namespace frontend { // Sub-handler for change stream queries. There is no direct grpc request // registered with this handler. Rather, if an incoming sql query is detected // as a change stream query, we wire the query from the generic // ExecuteStreamingSql handler to this specific handler. class ChangeStreamsHandler { public: static constexpr char kTestPartitionTable[] = "partition_table"; static constexpr char kTestDataTable[] = "data_table"; explicit ChangeStreamsHandler( backend::ChangeStreamQueryValidator::ChangeStreamMetadata& metadata) : metadata_(metadata) { // Name of the partition&data table to be read from. For certain test cases // this need to be set to test only mock tables. partition_table_ = absl::GetFlag( FLAGS_cloud_spanner_emulator_test_with_fake_partition_table) ? kTestPartitionTable : metadata.partition_table; } absl::Status ExecuteChangeStreamQuery( const spanner_api::ExecuteSqlRequest* request, ServerStream<spanner_api::PartialResultSet>* stream, std::shared_ptr<Session> session); // Execute change stream initial query when partition token is null. absl::Status ExecuteInitialQuery( std::shared_ptr<Session> session, ServerStream<spanner_api::PartialResultSet>* stream); absl::StatusOr<absl::Time> TryGetPartitionTokenEndTime( std::shared_ptr<Session> session, absl::Time read_ts) const; // Execute change stream partition query when partition token is non null. absl::Status ExecutePartitionQuery( ServerStream<spanner_api::PartialResultSet>* stream, std::shared_ptr<Session> session); backend::Query ConstructPartitionTablePartitionQuery() const; backend::Query ConstructDataTablePartitionQuery(absl::Time start, absl::Time end) const; absl::Status ProcessDataChangeRecordsAndStreamBack( backend::QueryResult& result, bool expect_heartbeat, absl::Time scan_end, bool& expect_metadata, absl::Time* last_record_time, ServerStream<spanner_api::PartialResultSet>* stream); const backend::ChangeStreamQueryValidator::ChangeStreamMetadata& metadata() const { return metadata_; } private: const backend::ChangeStreamQueryValidator::ChangeStreamMetadata& metadata_; std::string partition_table_; }; } // namespace frontend } // namespace emulator } // namespace spanner } // namespace google #endif // THIRD_PARTY_CLOUD_SPANNER_EMULATOR_FRONTEND_HANDLERS_CHANGE_STREAMS_H_