absl::Status ChangeStreamsHandler::ExecutePartitionQuery()

in frontend/handlers/change_streams.cc [291:403]


absl::Status ChangeStreamsHandler::ExecutePartitionQuery(
    ServerStream<spanner_api::PartialResultSet>* stream,
    std::shared_ptr<Session> session) {
  const absl::Time tvf_end = metadata().end_timestamp.has_value()
                                 ? metadata().end_timestamp.value()
                                 : absl::InfiniteFuture();
  const absl::Time now = Clock().Now();
  const absl::Duration heartbeat_interval =
      absl::Milliseconds(metadata().heartbeat_milliseconds);
  absl::Time last_record_time = now;
  absl::Time partition_token_end_time = absl::InfiniteFuture();
  absl::Time current_start = metadata().start_timestamp;
  absl::Time current_end = std::min(
      std::max(now,
               current_start +
                   absl::GetFlag(
                       FLAGS_change_streams_partition_query_chop_interval)),
      tvf_end);
  // Metadata is only expected for the first response to users in a single
  // query's lifetime.
  bool expect_metadata = true;
  while (current_start <= tvf_end && current_start < partition_token_end_time) {
    // For historical queries where tvf end is in the past, set the read
    // transaction snapshot time to now to prevent >1h stale read, which is now
    // allowed.
    absl::Time current_txn_snapshot_time = std::max(current_end, now);
    // Get the newest retention period so most up to date retention will apply
    // to curent running query.
    ZETASQL_ASSIGN_OR_RETURN(
        absl::Duration current_retention,
        TryGetChangeStreamRetentionPeriod(metadata().change_stream_name,
                                          session, current_txn_snapshot_time));
    spanner_api::TransactionOptions txn_options;
    // If the partition token hasn't been churned yet, we re-scan the partition
    // table to see if the end time has been churned and update the partition
    // end time.
    if (partition_token_end_time == absl::InfiniteFuture()) {
      ZETASQL_ASSIGN_OR_RETURN(
          partition_token_end_time,
          TryGetPartitionTokenEndTime(session, current_txn_snapshot_time));
    }
    ZETASQL_RETURN_IF_ERROR(ValidateTokenInRetentionWindow(
        metadata().start_timestamp, current_start, partition_token_end_time,
        current_retention));
    // Only scan data records up to minimum of current chopped end time
    // and end time of current partition token.
    const absl::Time scan_end = std::min(partition_token_end_time, current_end);
    const bool expect_heartbeat =
        current_end - last_record_time >= heartbeat_interval;
    // This transaction will be blocked until now passes current_end.
    ZETASQL_ASSIGN_OR_RETURN(*txn_options.mutable_read_only()->mutable_read_timestamp(),
                     TimestampToProto(current_txn_snapshot_time));
    ZETASQL_ASSIGN_OR_RETURN(auto txn,
                     session->CreateSingleUseTransaction(txn_options));
    absl::Status status =
        txn->GuardedCall(Transaction::OpType::kSql, [&]() -> absl::Status {
          backend::Query read_data_query =
              ConstructDataTablePartitionQuery(current_start, scan_end);
          ZETASQL_ASSIGN_OR_RETURN(auto data_records_results,
                           txn->ExecuteSql(read_data_query));
          ZETASQL_RETURN_IF_ERROR(ProcessDataChangeRecordsAndStreamBack(
              data_records_results, expect_heartbeat, scan_end, expect_metadata,
              &last_record_time, stream));
          if (partition_token_end_time <= current_end) {
            // Get child partition records after all data records are returned
            // in current query.
            backend::Query tail_query_partition_table =
                ConstructPartitionTablePartitionQuery();
            ZETASQL_ASSIGN_OR_RETURN(auto tail_partition_records_results,
                             txn->ExecuteSql(tail_query_partition_table));
            ZETASQL_RET_CHECK(!IsQueryResultEmpty(tail_partition_records_results));
            ZETASQL_ASSIGN_OR_RETURN(
                auto responses,
                metadata().is_pg
                    ? ConvertPartitionTableRowCursorToJson(
                          tail_partition_records_results.rows.get(),
                          /*initial_start_time=*/std::nullopt,
                          metadata().tvf_name, expect_metadata)
                    : ConvertPartitionTableRowCursorToStruct(
                          tail_partition_records_results.rows.get(),
                          /*initial_start_time=*/std::nullopt,
                          expect_metadata));
            expect_metadata = false;
            for (auto& response : responses) {
              stream->Send(response);
            }
            return absl::OkStatus();
          }
          return absl::OkStatus();
        });
    ZETASQL_RETURN_IF_ERROR(status);
    // Increment by 1 microsecond gap to avoid repetitive records.
    current_start = scan_end + absl::Microseconds(1);
    current_end = std::min(
        {current_start +
             absl::GetFlag(FLAGS_change_streams_partition_query_chop_interval),
         tvf_end, partition_token_end_time});
  }

  // If expect_metadata is still true, stub a heartbeat record.
  if (expect_metadata == true) {
    ZETASQL_ASSIGN_OR_RETURN(
        auto extra_heartbeat,
        metadata().is_pg
            ? ConvertHeartbeatTimestampToJson(tvf_end, metadata().tvf_name,
                                              expect_metadata)
            : ConvertHeartbeatTimestampToStruct(tvf_end, expect_metadata));
    for (auto& response : extra_heartbeat) {
      stream->Send(response);
    }
  }
  return absl::OkStatus();
}