in frontend/handlers/change_streams.cc [291:403]
absl::Status ChangeStreamsHandler::ExecutePartitionQuery(
ServerStream<spanner_api::PartialResultSet>* stream,
std::shared_ptr<Session> session) {
const absl::Time tvf_end = metadata().end_timestamp.has_value()
? metadata().end_timestamp.value()
: absl::InfiniteFuture();
const absl::Time now = Clock().Now();
const absl::Duration heartbeat_interval =
absl::Milliseconds(metadata().heartbeat_milliseconds);
absl::Time last_record_time = now;
absl::Time partition_token_end_time = absl::InfiniteFuture();
absl::Time current_start = metadata().start_timestamp;
absl::Time current_end = std::min(
std::max(now,
current_start +
absl::GetFlag(
FLAGS_change_streams_partition_query_chop_interval)),
tvf_end);
// Metadata is only expected for the first response to users in a single
// query's lifetime.
bool expect_metadata = true;
while (current_start <= tvf_end && current_start < partition_token_end_time) {
// For historical queries where tvf end is in the past, set the read
// transaction snapshot time to now to prevent >1h stale read, which is now
// allowed.
absl::Time current_txn_snapshot_time = std::max(current_end, now);
// Get the newest retention period so most up to date retention will apply
// to curent running query.
ZETASQL_ASSIGN_OR_RETURN(
absl::Duration current_retention,
TryGetChangeStreamRetentionPeriod(metadata().change_stream_name,
session, current_txn_snapshot_time));
spanner_api::TransactionOptions txn_options;
// If the partition token hasn't been churned yet, we re-scan the partition
// table to see if the end time has been churned and update the partition
// end time.
if (partition_token_end_time == absl::InfiniteFuture()) {
ZETASQL_ASSIGN_OR_RETURN(
partition_token_end_time,
TryGetPartitionTokenEndTime(session, current_txn_snapshot_time));
}
ZETASQL_RETURN_IF_ERROR(ValidateTokenInRetentionWindow(
metadata().start_timestamp, current_start, partition_token_end_time,
current_retention));
// Only scan data records up to minimum of current chopped end time
// and end time of current partition token.
const absl::Time scan_end = std::min(partition_token_end_time, current_end);
const bool expect_heartbeat =
current_end - last_record_time >= heartbeat_interval;
// This transaction will be blocked until now passes current_end.
ZETASQL_ASSIGN_OR_RETURN(*txn_options.mutable_read_only()->mutable_read_timestamp(),
TimestampToProto(current_txn_snapshot_time));
ZETASQL_ASSIGN_OR_RETURN(auto txn,
session->CreateSingleUseTransaction(txn_options));
absl::Status status =
txn->GuardedCall(Transaction::OpType::kSql, [&]() -> absl::Status {
backend::Query read_data_query =
ConstructDataTablePartitionQuery(current_start, scan_end);
ZETASQL_ASSIGN_OR_RETURN(auto data_records_results,
txn->ExecuteSql(read_data_query));
ZETASQL_RETURN_IF_ERROR(ProcessDataChangeRecordsAndStreamBack(
data_records_results, expect_heartbeat, scan_end, expect_metadata,
&last_record_time, stream));
if (partition_token_end_time <= current_end) {
// Get child partition records after all data records are returned
// in current query.
backend::Query tail_query_partition_table =
ConstructPartitionTablePartitionQuery();
ZETASQL_ASSIGN_OR_RETURN(auto tail_partition_records_results,
txn->ExecuteSql(tail_query_partition_table));
ZETASQL_RET_CHECK(!IsQueryResultEmpty(tail_partition_records_results));
ZETASQL_ASSIGN_OR_RETURN(
auto responses,
metadata().is_pg
? ConvertPartitionTableRowCursorToJson(
tail_partition_records_results.rows.get(),
/*initial_start_time=*/std::nullopt,
metadata().tvf_name, expect_metadata)
: ConvertPartitionTableRowCursorToStruct(
tail_partition_records_results.rows.get(),
/*initial_start_time=*/std::nullopt,
expect_metadata));
expect_metadata = false;
for (auto& response : responses) {
stream->Send(response);
}
return absl::OkStatus();
}
return absl::OkStatus();
});
ZETASQL_RETURN_IF_ERROR(status);
// Increment by 1 microsecond gap to avoid repetitive records.
current_start = scan_end + absl::Microseconds(1);
current_end = std::min(
{current_start +
absl::GetFlag(FLAGS_change_streams_partition_query_chop_interval),
tvf_end, partition_token_end_time});
}
// If expect_metadata is still true, stub a heartbeat record.
if (expect_metadata == true) {
ZETASQL_ASSIGN_OR_RETURN(
auto extra_heartbeat,
metadata().is_pg
? ConvertHeartbeatTimestampToJson(tvf_end, metadata().tvf_name,
expect_metadata)
: ConvertHeartbeatTimestampToStruct(tvf_end, expect_metadata));
for (auto& response : extra_heartbeat) {
stream->Send(response);
}
}
return absl::OkStatus();
}