frontend/handlers/change_streams.cc (345 lines of code) (raw):

// // Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // #include "frontend/handlers/change_streams.h" #include <algorithm> #include <memory> #include <optional> #include <string> #include <vector> #include "google/spanner/v1/spanner.pb.h" #include "google/spanner/v1/transaction.pb.h" #include "absl/flags/flag.h" #include "absl/log/log.h" #include "absl/status/status.h" #include "absl/strings/substitute.h" #include "absl/time/time.h" #include "backend/access/read.h" #include "backend/query/change_stream/change_stream_query_validator.h" #include "backend/query/query_engine.h" #include "backend/schema/catalog/schema.h" #include "common/clock.h" #include "common/errors.h" #include "frontend/converters/change_streams.h" #include "frontend/converters/pg_change_streams.h" #include "frontend/converters/time.h" #include "frontend/entities/session.h" #include "frontend/entities/transaction.h" #include "frontend/server/handler.h" #include "zetasql/base/ret_check.h" #include "zetasql/base/status_macros.h" ABSL_FLAG(bool, cloud_spanner_emulator_test_with_fake_partition_table, false, "TEST ONLY. Set to true to enable querying against mocked change " "stream internal partition table during test."); ABSL_FLAG( absl::Duration, change_streams_partition_query_chop_interval, absl::Milliseconds(100), "Change streams chopped interval for partition query in milliseconds."); namespace google { namespace spanner { namespace emulator { namespace frontend { namespace { // We only allow single-use read-only strong transaction for change stream // queries absl::Status ValidateTransactionSelectorForChangeStreamQuery( const spanner_api::TransactionSelector& selector) { // Default transaction selector is single use read only strong transaction, so // directly returns ok if (selector.selector_case() == spanner_api::TransactionSelector::SELECTOR_NOT_SET) { return absl::OkStatus(); } if (selector.selector_case() != spanner_api::TransactionSelector::SelectorCase::kSingleUse) { return error::ChangeStreamQueriesMustBeSingleUseOnly(); } if (!selector.single_use().read_only().has_strong()) { return error::ChangeStreamQueriesMustBeStrongReads(); } return absl::OkStatus(); } absl::Status VerifyChangeStreamExistence(const std::string& change_stream_name, const backend::Schema* schema) { auto change_stream = schema->FindChangeStream(change_stream_name); if (change_stream != nullptr) { return absl::OkStatus(); } return error::ChangeStreamNotFound(change_stream_name); } absl::StatusOr<absl::Duration> TryGetChangeStreamRetentionPeriod( const std::string& change_stream_name, std::shared_ptr<Session> session, absl::Time read_ts) { spanner_api::TransactionOptions txn_options; txn_options.mutable_read_only()->set_return_read_timestamp(false); // If the user provided tvf start time is past now, wait until this future // time to perform a read on partition token end time. ZETASQL_ASSIGN_OR_RETURN( *txn_options.mutable_read_only()->mutable_min_read_timestamp(), TimestampToProto(read_ts)); ZETASQL_ASSIGN_OR_RETURN(auto txn, session->CreateSingleUseTransaction(txn_options)); auto change_stream = txn->schema()->FindChangeStream(change_stream_name); if (change_stream != nullptr) { return absl::Seconds(change_stream->parsed_retention_period()); } return error::ChangeStreamNotFound(change_stream_name); } bool IsQueryResultEmpty(backend::QueryResult& result) { return result.num_output_rows == 0; } absl::Status ValidateTokenInRetentionWindow( const absl::Time tvf_start, const absl::Time current_chopped_start, const absl::Time current_token_end, const absl::Duration current_retention) { absl::Time gc_time = Clock().Now() - current_retention; // If current chopped start is before gc_time, we know this token must be // invalid and need to identify the correct error type. if (current_chopped_start < gc_time) { // If current token already end before gc_time, this entire partition has // expired. if (current_token_end < gc_time) { return error::ChangeStreamStalePartition(); } // Although token is not expired, the user provided tvf start time is too // old. return error::InvalidChangeStreamTvfArgumentStartTimestampTooOld( absl::FormatTime(gc_time), absl::FormatTime(tvf_start)); } return absl::OkStatus(); } } // namespace absl::Status ChangeStreamsHandler::ProcessDataChangeRecordsAndStreamBack( backend::QueryResult& result, const bool expect_heartbeat, const absl::Time scan_end, bool& expect_metadata, absl::Time* last_record_time, ServerStream<spanner_api::PartialResultSet>* stream) { std::vector<spanner_api::PartialResultSet> responses; if (IsQueryResultEmpty(result) && expect_heartbeat) { ZETASQL_ASSIGN_OR_RETURN( responses, metadata().is_pg ? ConvertHeartbeatTimestampToJson(scan_end, metadata().tvf_name, expect_metadata) : ConvertHeartbeatTimestampToStruct(scan_end, expect_metadata)); expect_metadata = false; *last_record_time = scan_end; } else if (!IsQueryResultEmpty(result)) { ZETASQL_ASSIGN_OR_RETURN(responses, metadata().is_pg ? ConvertDataTableRowCursorToJson( result.rows.get(), metadata().tvf_name, expect_metadata) : ConvertDataTableRowCursorToStruct( result.rows.get(), expect_metadata)); *last_record_time = scan_end; expect_metadata = false; } for (auto& response : responses) { stream->Send(response); } return absl::OkStatus(); } absl::Status ChangeStreamsHandler::ExecuteInitialQuery( std::shared_ptr<Session> session, ServerStream<spanner_api::PartialResultSet>* stream) { spanner_api::TransactionOptions txn_options; ZETASQL_ASSIGN_OR_RETURN( *txn_options.mutable_read_only()->mutable_min_read_timestamp(), TimestampToProto(metadata().start_timestamp)); txn_options.mutable_read_only()->set_return_read_timestamp(false); ZETASQL_ASSIGN_OR_RETURN(auto txn, session->CreateSingleUseTransaction(txn_options)); return txn->GuardedCall(Transaction::OpType::kSql, [&]() -> absl::Status { ZETASQL_RETURN_IF_ERROR(VerifyChangeStreamExistence(metadata().change_stream_name, txn->schema())); backend::Query initial_query = backend::Query{absl::Substitute( "SELECT start_time, partition_token, parents " "FROM $0 " "WHERE '$1' >= start_time AND ( end_time IS NULL OR '$1' < end_time " ") ORDER BY (partition_token)", partition_table_, metadata().start_timestamp)}; initial_query.change_stream_internal_lookup = metadata().change_stream_name; ZETASQL_ASSIGN_OR_RETURN(auto partition_results, txn->ExecuteSql(initial_query)); // Initial query is guaranteed to return at least 1 child partition record. ZETASQL_RET_CHECK(!IsQueryResultEmpty(partition_results)); ZETASQL_ASSIGN_OR_RETURN( auto responses, metadata().is_pg ? ConvertPartitionTableRowCursorToJson(partition_results.rows.get(), metadata().start_timestamp, metadata().tvf_name, /*need_metadata=*/true) : ConvertPartitionTableRowCursorToStruct( partition_results.rows.get(), metadata().start_timestamp, /*need_metadata=*/true)); for (auto& response : responses) { stream->Send(response); } return absl::OkStatus(); }); } absl::StatusOr<absl::Time> ChangeStreamsHandler::TryGetPartitionTokenEndTime( std::shared_ptr<Session> session, absl::Time read_ts) const { absl::Time start, end; spanner_api::TransactionOptions txn_options; txn_options.mutable_read_only()->set_return_read_timestamp(false); // If the user provided tvf start time is past now, wait until this future // time to perform a read on partition token end time. ZETASQL_ASSIGN_OR_RETURN(*txn_options.mutable_read_only()->mutable_read_timestamp(), TimestampToProto(read_ts)); ZETASQL_ASSIGN_OR_RETURN(auto txn, session->CreateSingleUseTransaction(txn_options)); ZETASQL_RETURN_IF_ERROR( txn->GuardedCall(Transaction::OpType::kSql, [&]() -> absl::Status { backend::Query get_partition_token_time_query = backend::Query{absl::Substitute( "SELECT start_time, end_time " "FROM $0 " "WHERE( partition_token = '$1' )", partition_table_, metadata().partition_token.value())}; get_partition_token_time_query.change_stream_internal_lookup = metadata().change_stream_name; ZETASQL_ASSIGN_OR_RETURN(auto token_time_results, txn->ExecuteSql(get_partition_token_time_query)); if (IsQueryResultEmpty(token_time_results)) { return error:: InvalidChangeStreamTvfArgumentPartitionTokenInvalidChangeStreamName( // NOLINT metadata().partition_token.value()); } backend::RowCursor* cursor = token_time_results.rows.get(); cursor->Next(); start = cursor->ColumnValue(0).ToTime(); // If the end_time of current partition token is null, set the returning // end time to InfiniteFuture(). end = cursor->ColumnValue(1).is_null() ? absl::InfiniteFuture() : cursor->ColumnValue(1).ToTime(); // Return error if user provided tvf start time is not within the // lifetime of the user provided partition token. if (metadata().start_timestamp < start || metadata().start_timestamp > end) { return error:: InvalidChangeStreamTvfArgumentStartTimestampForPartition( absl::FormatTime(start), absl::FormatTime(end), absl::FormatTime(metadata().start_timestamp)); } return absl::OkStatus(); })); return end; } backend::Query ChangeStreamsHandler::ConstructDataTablePartitionQuery( absl::Time start, absl::Time end) const { // If user passed end_timestamp is not null and current scan is the last scan // in query lifetime, we do an inclusive scan to include the data change // record with commit_timestamp exactly at the user passed end_timestamp. If // current scan is a middle chopped scan, we do an exclusive scan because all // data records of a partition token has a commit_timestamp in // [partition_start_time,partition_end_time). const bool is_inclusive_read = metadata().end_timestamp.has_value() && metadata().end_timestamp.value() == end; backend::Query data_table_partition_query = backend::Query{absl::Substitute( "SELECT * " "FROM $0 " "WHERE( partition_token='$1' AND commit_timestamp >= '$2' AND " "commit_timestamp $3 '$4' ) ORDER BY partition_token, commit_timestamp, " "server_transaction_id,record_sequence", metadata().data_table, metadata().partition_token.value(), start, is_inclusive_read ? "<=" : "<", end)}; data_table_partition_query.change_stream_internal_lookup = metadata().change_stream_name; return data_table_partition_query; } backend::Query ChangeStreamsHandler::ConstructPartitionTablePartitionQuery() const { backend::Query data_table_partition_query = backend::Query{absl::Substitute( "SELECT start_time, partition_token, parents FROM $0 " "WHERE (ARRAY_INCLUDES((SELECT children FROM $0 WHERE " "partition_token = '$1'), partition_token)) ORDER BY(partition_token)", partition_table_, metadata().partition_token.value())}; data_table_partition_query.change_stream_internal_lookup = metadata().change_stream_name; return data_table_partition_query; } absl::Status ChangeStreamsHandler::ExecutePartitionQuery( ServerStream<spanner_api::PartialResultSet>* stream, std::shared_ptr<Session> session) { const absl::Time tvf_end = metadata().end_timestamp.has_value() ? metadata().end_timestamp.value() : absl::InfiniteFuture(); const absl::Time now = Clock().Now(); const absl::Duration heartbeat_interval = absl::Milliseconds(metadata().heartbeat_milliseconds); absl::Time last_record_time = now; absl::Time partition_token_end_time = absl::InfiniteFuture(); absl::Time current_start = metadata().start_timestamp; absl::Time current_end = std::min( std::max(now, current_start + absl::GetFlag( FLAGS_change_streams_partition_query_chop_interval)), tvf_end); // Metadata is only expected for the first response to users in a single // query's lifetime. bool expect_metadata = true; while (current_start <= tvf_end && current_start < partition_token_end_time) { // For historical queries where tvf end is in the past, set the read // transaction snapshot time to now to prevent >1h stale read, which is now // allowed. absl::Time current_txn_snapshot_time = std::max(current_end, now); // Get the newest retention period so most up to date retention will apply // to curent running query. ZETASQL_ASSIGN_OR_RETURN( absl::Duration current_retention, TryGetChangeStreamRetentionPeriod(metadata().change_stream_name, session, current_txn_snapshot_time)); spanner_api::TransactionOptions txn_options; // If the partition token hasn't been churned yet, we re-scan the partition // table to see if the end time has been churned and update the partition // end time. if (partition_token_end_time == absl::InfiniteFuture()) { ZETASQL_ASSIGN_OR_RETURN( partition_token_end_time, TryGetPartitionTokenEndTime(session, current_txn_snapshot_time)); } ZETASQL_RETURN_IF_ERROR(ValidateTokenInRetentionWindow( metadata().start_timestamp, current_start, partition_token_end_time, current_retention)); // Only scan data records up to minimum of current chopped end time // and end time of current partition token. const absl::Time scan_end = std::min(partition_token_end_time, current_end); const bool expect_heartbeat = current_end - last_record_time >= heartbeat_interval; // This transaction will be blocked until now passes current_end. ZETASQL_ASSIGN_OR_RETURN(*txn_options.mutable_read_only()->mutable_read_timestamp(), TimestampToProto(current_txn_snapshot_time)); ZETASQL_ASSIGN_OR_RETURN(auto txn, session->CreateSingleUseTransaction(txn_options)); absl::Status status = txn->GuardedCall(Transaction::OpType::kSql, [&]() -> absl::Status { backend::Query read_data_query = ConstructDataTablePartitionQuery(current_start, scan_end); ZETASQL_ASSIGN_OR_RETURN(auto data_records_results, txn->ExecuteSql(read_data_query)); ZETASQL_RETURN_IF_ERROR(ProcessDataChangeRecordsAndStreamBack( data_records_results, expect_heartbeat, scan_end, expect_metadata, &last_record_time, stream)); if (partition_token_end_time <= current_end) { // Get child partition records after all data records are returned // in current query. backend::Query tail_query_partition_table = ConstructPartitionTablePartitionQuery(); ZETASQL_ASSIGN_OR_RETURN(auto tail_partition_records_results, txn->ExecuteSql(tail_query_partition_table)); ZETASQL_RET_CHECK(!IsQueryResultEmpty(tail_partition_records_results)); ZETASQL_ASSIGN_OR_RETURN( auto responses, metadata().is_pg ? ConvertPartitionTableRowCursorToJson( tail_partition_records_results.rows.get(), /*initial_start_time=*/std::nullopt, metadata().tvf_name, expect_metadata) : ConvertPartitionTableRowCursorToStruct( tail_partition_records_results.rows.get(), /*initial_start_time=*/std::nullopt, expect_metadata)); expect_metadata = false; for (auto& response : responses) { stream->Send(response); } return absl::OkStatus(); } return absl::OkStatus(); }); ZETASQL_RETURN_IF_ERROR(status); // Increment by 1 microsecond gap to avoid repetitive records. current_start = scan_end + absl::Microseconds(1); current_end = std::min( {current_start + absl::GetFlag(FLAGS_change_streams_partition_query_chop_interval), tvf_end, partition_token_end_time}); } // If expect_metadata is still true, stub a heartbeat record. if (expect_metadata == true) { ZETASQL_ASSIGN_OR_RETURN( auto extra_heartbeat, metadata().is_pg ? ConvertHeartbeatTimestampToJson(tvf_end, metadata().tvf_name, expect_metadata) : ConvertHeartbeatTimestampToStruct(tvf_end, expect_metadata)); for (auto& response : extra_heartbeat) { stream->Send(response); } } return absl::OkStatus(); } absl::Status ChangeStreamsHandler::ExecuteChangeStreamQuery( const spanner_api::ExecuteSqlRequest* request, ServerStream<spanner_api::PartialResultSet>* stream, std::shared_ptr<Session> session) { ZETASQL_RETURN_IF_ERROR( ValidateTransactionSelectorForChangeStreamQuery(request->transaction())); if (request->query_mode() == spanner_api::ExecuteSqlRequest::PLAN) { return error::EmulatorDoesNotSupportQueryPlans(); } if (!metadata().partition_token.has_value()) { return ExecuteInitialQuery(session, stream); } else { return ExecutePartitionQuery(stream, session); } } } // namespace frontend } // namespace emulator } // namespace spanner } // namespace google