frontend/converters/change_streams.cc (373 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "frontend/converters/change_streams.h"
#include <optional>
#include <string>
#include <vector>
#include "google/protobuf/struct.pb.h"
#include "google/spanner/v1/result_set.pb.h"
#include "zetasql/public/analyzer.h"
#include "zetasql/public/analyzer_options.h"
#include "zetasql/public/simple_catalog.h"
#include "zetasql/public/type.h"
#include "zetasql/public/types/array_type.h"
#include "zetasql/public/types/type.h"
#include "zetasql/public/types/type_factory.h"
#include "zetasql/public/value.h"
#include "zetasql/base/no_destructor.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/strings/str_format.h"
#include "absl/strings/substitute.h"
#include "backend/access/read.h"
#include "backend/query/analyzer_options.h"
#include "common/constants.h"
#include "common/limits.h"
#include "frontend/converters/chunking.h"
#include "frontend/converters/types.h"
#include "frontend/converters/values.h"
#include "zetasql/base/ret_check.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace frontend {
namespace spanner_api = ::google::spanner::v1;
namespace {
struct ChangeStreamOutputTypes {
enum ReturningType { DATA_CHANGE, HEARTBEAT, CHILD_PARTITIONS };
ChangeStreamOutputTypes() {
ConstructChangeRecordTypes();
ConstructChildPartitionRecordTypes();
ConstructHeartbeatRecordTypes();
ConstructDataChangeRecordTypes();
}
zetasql::TypeFactory type_factory;
zetasql::SimpleCatalog catalog = zetasql::SimpleCatalog(
"For analyzing change stream output types only.");
zetasql::AnalyzerOptions analyzer_options =
backend::MakeGoogleSqlAnalyzerOptions(kDefaultTimeZone);
// Outter most change record output related types.
const zetasql::Type* change_record_arr;
const zetasql::StructType* change_record_struct;
// Heartbeat record related types.
const zetasql::ArrayType* heartbeat_record_arr;
const zetasql::StructType* heartbeat_record_struct;
// Child partition record related types.
const zetasql::ArrayType* child_partitions_record_arr;
const zetasql::StructType* child_partitions_record_struct;
const zetasql::ArrayType* child_partitions_arr;
const zetasql::StructType* child_partitions_struct;
const zetasql::ArrayType* parents_arr;
// Data change record related types
const zetasql::ArrayType* data_change_record_arr;
const zetasql::StructType* data_change_record_struct;
const zetasql::ArrayType* column_types_arr;
const zetasql::StructType* column_types_struct;
const zetasql::ArrayType* mods_arr;
const zetasql::StructType* mods_struct;
void ConstructChangeRecordTypes() {
ABSL_CHECK_OK(zetasql::AnalyzeType(
absl::Substitute(kChangeStreamTvfOutputFormat, "JSON"),
analyzer_options, &catalog, &type_factory, &change_record_arr));
change_record_struct =
change_record_arr->AsArray()->element_type()->AsStruct();
}
void ConstructHeartbeatRecordTypes() {
bool is_ambiguous;
heartbeat_record_arr = change_record_arr->AsArray()
->element_type()
->AsStruct()
->FindField("heartbeat_record", &is_ambiguous)
->type->AsArray();
heartbeat_record_struct = heartbeat_record_arr->element_type()->AsStruct();
}
void ConstructChildPartitionRecordTypes() {
bool is_ambiguous;
child_partitions_record_arr =
change_record_arr->AsArray()
->element_type()
->AsStruct()
->FindField("child_partitions_record", &is_ambiguous)
->type->AsArray();
child_partitions_record_struct =
child_partitions_record_arr->element_type()->AsStruct();
child_partitions_arr = child_partitions_record_struct
->FindField("child_partitions", &is_ambiguous)
->type->AsArray();
child_partitions_struct = child_partitions_arr->element_type()->AsStruct();
parents_arr = zetasql::types::StringArrayType();
}
void ConstructDataChangeRecordTypes() {
bool is_ambigous;
data_change_record_arr = change_record_arr->AsArray()
->element_type()
->AsStruct()
->FindField("data_change_record", &is_ambigous)
->type->AsArray();
data_change_record_struct =
data_change_record_arr->AsArray()->element_type()->AsStruct();
column_types_arr =
data_change_record_struct->FindField("column_types", &is_ambigous)
->type->AsArray();
column_types_struct =
column_types_arr->AsArray()->element_type()->AsStruct();
mods_arr = data_change_record_struct->FindField("mods", &is_ambigous)
->type->AsArray();
mods_struct = mods_arr->AsArray()->element_type()->AsStruct();
}
};
// Singleton static struct that contains all the fixed record types.
const ChangeStreamOutputTypes* GetChangeStreamOutputTypes() {
static const zetasql_base::NoDestructor<ChangeStreamOutputTypes>
kChangeStreamOutputTypes{};
return kChangeStreamOutputTypes.get();
}
std::string ToFragmentIdString(int64_t record_sequence) {
return absl::StrFormat("%08d", record_sequence);
}
zetasql::Value CreateEmptyArrayForRecordType(
ChangeStreamOutputTypes::ReturningType type) {
const ChangeStreamOutputTypes* types = GetChangeStreamOutputTypes();
switch (type) {
case ChangeStreamOutputTypes::ReturningType::HEARTBEAT:
return zetasql::Value::EmptyArray(types->heartbeat_record_arr);
case ChangeStreamOutputTypes::ReturningType::CHILD_PARTITIONS:
return zetasql::Value::EmptyArray(types->child_partitions_record_arr);
case ChangeStreamOutputTypes::DATA_CHANGE:
return zetasql::Value::EmptyArray(types->data_change_record_arr);
}
}
absl::StatusOr<zetasql::Value> CreateChangeRecord(
zetasql::Value data_change_record, zetasql::Value heartbeat_record,
zetasql::Value child_partitions_record) {
zetasql::Value change_record_arr_val;
zetasql::Value change_record_struct_val;
const ChangeStreamOutputTypes* types = GetChangeStreamOutputTypes();
ZETASQL_ASSIGN_OR_RETURN(
change_record_struct_val,
zetasql::Value::MakeStruct(
types->change_record_struct,
{data_change_record, heartbeat_record, child_partitions_record}));
return zetasql::Value::MakeArray(types->change_record_arr->AsArray(),
{change_record_struct_val});
}
absl::StatusOr<zetasql::Value> CreateHeartbeatRecord(absl::Time timestamp) {
const ChangeStreamOutputTypes* types = GetChangeStreamOutputTypes();
std::vector<zetasql::Value> values;
values.push_back(zetasql::Value::Timestamp(timestamp));
ZETASQL_ASSIGN_OR_RETURN(auto heartbeat, zetasql::Value::MakeStruct(
types->heartbeat_record_struct, values));
return zetasql::Value::MakeArray(types->heartbeat_record_arr, {heartbeat});
}
absl::StatusOr<zetasql::Value> CreateChildPartitionRecord(
backend::RowCursor* cursor, int64_t record_sequence,
std::optional<absl::Time> initial_start_timestamp) {
// The passed in cursor is guaranteed to have the shape of
// {start_time(TIMESTAMP),partition_token(STRING), parents(ARRAY<STRING>)}
// due to the fixed shape of change stream internal partition table.
const ChangeStreamOutputTypes* types = GetChangeStreamOutputTypes();
std::vector<zetasql::Value> values;
values.push_back(
initial_start_timestamp.has_value()
? zetasql::Value::Timestamp(initial_start_timestamp.value())
: cursor->ColumnValue(0));
values.push_back(
zetasql::Value::String(ToFragmentIdString(record_sequence)));
std::vector<zetasql::Value> child_partitions_struct_values;
// If a split event happens during partition query, one child partition record
// can contain up to two partition tokens after split. If initial_start_time
// doesn't have value and there are more than one row contain in the partition
// table row cursor, we know a split event happened and keep inserting the
// remaining splitted partition tokens in current row cursor.
do {
zetasql::Value partition_token_val = cursor->ColumnValue(1);
// Prevent populating parent tokens for initial partition queries.
zetasql::Value parent_partitions_token_val =
initial_start_timestamp.has_value()
? zetasql::Value::EmptyArray(types->parents_arr)
: cursor->ColumnValue(2);
ZETASQL_ASSIGN_OR_RETURN(auto child_partitions_struct_val,
zetasql::Value::MakeStruct(
types->child_partitions_struct,
{partition_token_val, parent_partitions_token_val}));
child_partitions_struct_values.push_back(child_partitions_struct_val);
} while (!initial_start_timestamp.has_value() && cursor->Next());
ZETASQL_ASSIGN_OR_RETURN(auto child_partitions_struct_array_val,
zetasql::Value::MakeArray(types->child_partitions_arr,
child_partitions_struct_values));
values.push_back(child_partitions_struct_array_val);
ZETASQL_ASSIGN_OR_RETURN(auto child_partitions_record_struct_val,
zetasql::Value::MakeStruct(
types->child_partitions_record_struct, values));
zetasql::Value final_child_partition_record;
ZETASQL_ASSIGN_OR_RETURN(
final_child_partition_record,
zetasql::Value::MakeArray(types->child_partitions_record_arr->AsArray(),
{child_partitions_record_struct_val}));
return final_child_partition_record;
}
absl::StatusOr<zetasql::Value> CreateDataChangeRecord(
backend::RowCursor* cursor) {
const ChangeStreamOutputTypes* types = GetChangeStreamOutputTypes();
std::vector<zetasql::Value> values;
// The passed in cursor is guaranteed to have the shape of
// {"partition_token(STRING)",
// "commit_timestamp(TIMESTAMP)",
// "server_transaction_id(STRING)",
// "record_sequence(STRING)",
// "is_last_record_in_transaction_in_partition(BOOL)",
// "table_name(STRING)",
// "column_types_name(ARRAY<STRING>)",
// "column_types_type(ARRAY<STRING>)",
// "column_types_is_primary_key(ARRAY<BOOL>)"
// "column_types_ordinal_position(ARRAY<INT64>)"
// "mods_keys(ARRAY<STRING>)",
// "mods_new_values(ARRAY<STRING>)",
// "mods_old_values(ARRAY<STRING>)",
// "mod_type(STRING)",
// "value_capture_type(STRING)",
// "number_of_records_in_transaction(INT64)",
// "number_of_partitions_in_transaction(INT64)",
// "transaction_tag(STRING)",
// "is_system_transaction(BOOL)"},
// due to the fixed shape of change stream internal data table.
ZETASQL_RET_CHECK(cursor->NumColumns() == 19);
values.reserve(18);
// Skip the first column partition_token(STRING) since partition token is not
// part of the actual data change record that will be returned to the users.
for (int i = 1; i < 6; i++) {
// Swap the order of server transaction id and record sequence due to the
// different table schema.
if (i == 2) {
values.push_back(cursor->ColumnValue(3));
} else if (i == 3) {
values.push_back(cursor->ColumnValue(2));
} else {
values.push_back(cursor->ColumnValue(i));
}
}
// Reconstruct each column type JSON into a struct:
// column_types ARRAY<STRUCT<
// name STRING,
// type JSON,
// is_primary_key BOOL,
// ordinal_position INT64>>
zetasql::Value column_types_name_arr = cursor->ColumnValue(6);
zetasql::Value column_types_type_arr = cursor->ColumnValue(7);
zetasql::Value column_types_is_primary_key = cursor->ColumnValue(8);
zetasql::Value column_types_ordinal_position = cursor->ColumnValue(9);
std::vector<zetasql::Value> column_types_arr_structs;
for (int i = 0; i < column_types_name_arr.num_elements(); i++) {
std::vector<zetasql::Value> curr_column_type_vals{
column_types_name_arr.element(i),
zetasql::Value::Json(
zetasql::JSONValue::ParseJSONString(
column_types_type_arr.element(i).string_value())
.value()),
column_types_is_primary_key.element(i),
column_types_ordinal_position.element(i)};
ZETASQL_ASSIGN_OR_RETURN(auto curr_column_type_struct,
zetasql::Value::MakeStruct(types->column_types_struct,
curr_column_type_vals));
column_types_arr_structs.push_back(curr_column_type_struct);
}
ZETASQL_ASSIGN_OR_RETURN(auto column_types_struct_array_val,
zetasql::Value::MakeArray(types->column_types_arr,
column_types_arr_structs));
values.push_back(column_types_struct_array_val);
// Reconstruct each column type JSON into a struct:
// mods ARRAY<STRUCT<
// keys JSON,
// new_values JSON,
// old_values JSON>>
std::vector<zetasql::Value> mods_arr_vals;
zetasql::Value mods_keys_arr = cursor->ColumnValue(10);
zetasql::Value mods_new_values_arr = cursor->ColumnValue(11);
zetasql::Value mods_old_values_arr = cursor->ColumnValue(12);
for (int i = 0; i < mods_keys_arr.num_elements(); i++) {
std::vector<zetasql::Value> curr_mod_vals{
zetasql::Value::Json(zetasql::JSONValue::ParseJSONString(
mods_keys_arr.element(i).string_value())
.value()),
zetasql::Value::Json(
zetasql::JSONValue::ParseJSONString(
mods_new_values_arr.element(i).string_value())
.value()),
zetasql::Value::Json(
zetasql::JSONValue::ParseJSONString(
mods_old_values_arr.element(i).string_value())
.value()),
};
ZETASQL_ASSIGN_OR_RETURN(
auto curr_mods_struct_val,
zetasql::Value::MakeStruct(types->mods_struct, curr_mod_vals));
mods_arr_vals.push_back(curr_mods_struct_val);
}
ZETASQL_ASSIGN_OR_RETURN(auto mods_struct_array_val,
zetasql::Value::MakeArray(types->mods_arr, mods_arr_vals));
values.push_back(mods_struct_array_val);
for (int i = 13; i < 19; i++) {
values.push_back(cursor->ColumnValue(i));
}
ZETASQL_ASSIGN_OR_RETURN(
auto data_change_record_struct_val,
zetasql::Value::MakeStruct(types->data_change_record_struct, values));
ZETASQL_ASSIGN_OR_RETURN(
auto final_data_change_record,
zetasql::Value::MakeArray(types->data_change_record_arr,
{data_change_record_struct_val}));
return final_data_change_record;
}
absl::Status PopulateMetadata(
std::vector<spanner_api::PartialResultSet>* responses) {
auto* result_metadata_pb = responses->at(0).mutable_metadata();
auto* field_pb = result_metadata_pb->mutable_row_type()->add_fields();
field_pb->set_name(kChangeStreamTvfOutputColumn);
ZETASQL_RETURN_IF_ERROR(TypeToProto(GetChangeStreamOutputTypes()->change_record_arr,
field_pb->mutable_type()));
return absl::OkStatus();
}
void PopulateFakeResumeTokens(
std::vector<spanner_api::PartialResultSet>* responses) {
for (auto& response : *responses) {
*response.mutable_resume_token() = kChangeStreamDummyResumeToken;
}
}
} // namespace
absl::StatusOr<std::vector<spanner_api::PartialResultSet>>
ConvertHeartbeatTimestampToStruct(absl::Time timestamp, bool expect_metadata) {
spanner_api::ResultSet result_pb;
auto* row_pb = result_pb.add_rows();
ZETASQL_ASSIGN_OR_RETURN(auto heartbeat_record, CreateHeartbeatRecord(timestamp));
ZETASQL_ASSIGN_OR_RETURN(
auto change_record,
CreateChangeRecord(
CreateEmptyArrayForRecordType(
ChangeStreamOutputTypes::ReturningType::DATA_CHANGE),
heartbeat_record,
CreateEmptyArrayForRecordType(
ChangeStreamOutputTypes::ReturningType::CHILD_PARTITIONS)));
ZETASQL_ASSIGN_OR_RETURN(*row_pb->add_values(), ValueToProto(change_record));
ZETASQL_ASSIGN_OR_RETURN(auto responses,
ChunkResultSet(result_pb, limits::kMaxStreamingChunkSize));
if (expect_metadata) {
ZETASQL_RETURN_IF_ERROR(PopulateMetadata(&responses));
} else {
responses.at(0).clear_metadata();
}
PopulateFakeResumeTokens(&responses);
return responses;
}
absl::StatusOr<std::vector<spanner_api::PartialResultSet>>
ConvertPartitionTableRowCursorToStruct(
backend::RowCursor* row_cursor,
std::optional<absl::Time> initial_start_timestamp, bool expect_metadata) {
spanner_api::ResultSet result_pb;
int64_t record_sequence = 0;
while (row_cursor->Next()) {
auto* row_pb = result_pb.add_rows();
ZETASQL_ASSIGN_OR_RETURN(auto child_partitions_record,
CreateChildPartitionRecord(row_cursor, record_sequence,
initial_start_timestamp));
ZETASQL_ASSIGN_OR_RETURN(
auto change_record,
CreateChangeRecord(
CreateEmptyArrayForRecordType(ChangeStreamOutputTypes::DATA_CHANGE),
CreateEmptyArrayForRecordType(
ChangeStreamOutputTypes::ReturningType::HEARTBEAT),
child_partitions_record));
ZETASQL_ASSIGN_OR_RETURN(*row_pb->add_values(), ValueToProto(change_record));
record_sequence++;
}
ZETASQL_ASSIGN_OR_RETURN(auto responses,
ChunkResultSet(result_pb, limits::kMaxStreamingChunkSize));
if (expect_metadata) {
ZETASQL_RETURN_IF_ERROR(PopulateMetadata(&responses));
} else {
responses.at(0).clear_metadata();
}
PopulateFakeResumeTokens(&responses);
return responses;
}
absl::StatusOr<std::vector<spanner_api::PartialResultSet>>
ConvertDataTableRowCursorToStruct(backend::RowCursor* row_cursor,
bool expect_metadata) {
spanner_api::ResultSet result_pb;
while (row_cursor->Next()) {
auto* row_pb = result_pb.add_rows();
ZETASQL_ASSIGN_OR_RETURN(auto data_change_record,
CreateDataChangeRecord(row_cursor));
ZETASQL_ASSIGN_OR_RETURN(
auto change_record,
CreateChangeRecord(
data_change_record,
CreateEmptyArrayForRecordType(
ChangeStreamOutputTypes::ReturningType::HEARTBEAT),
CreateEmptyArrayForRecordType(
ChangeStreamOutputTypes::ReturningType::CHILD_PARTITIONS)));
ZETASQL_ASSIGN_OR_RETURN(*row_pb->add_values(), ValueToProto(change_record));
}
ZETASQL_ASSIGN_OR_RETURN(auto responses,
ChunkResultSet(result_pb, limits::kMaxStreamingChunkSize));
if (expect_metadata) {
ZETASQL_RETURN_IF_ERROR(PopulateMetadata(&responses));
} else {
responses.at(0).clear_metadata();
}
PopulateFakeResumeTokens(&responses);
return responses;
}
} // namespace frontend
} // namespace emulator
} // namespace spanner
} // namespace google