backend/actions/change_stream.cc (731 lines of code) (raw):
//
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
#include "backend/actions/change_stream.h"
#include <algorithm>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <tuple>
#include <utility>
#include <variant>
#include <vector>
#include "zetasql/public/json_value.h"
#include "zetasql/public/types/type.h"
#include "zetasql/public/types/type_factory.h"
#include "zetasql/public/value.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/log/log.h"
#include "absl/status/status.h"
#include "absl/status/statusor.h"
#include "absl/strings/str_format.h"
#include "absl/strings/string_view.h"
#include "absl/time/time.h"
#include "google/cloud/spanner/bytes.h"
#include "backend/actions/context.h"
#include "backend/actions/ops.h"
#include "backend/common/ids.h"
#include "backend/common/variant.h"
#include "backend/datamodel/key.h"
#include "backend/datamodel/key_range.h"
#include "backend/datamodel/value.h"
#include "backend/schema/catalog/change_stream.h"
#include "backend/schema/catalog/column.h"
#include "backend/schema/catalog/schema.h"
#include "backend/schema/catalog/table.h"
#include "backend/storage/iterator.h"
#include "common/constants.h"
#include "common/errors.h"
#include "common/limits.h"
#include "nlohmann/json_fwd.hpp"
#include "nlohmann/json.hpp"
#include "third_party/spanner_pg/datatypes/extended/spanner_extended_type.h"
#include "zetasql/base/status_macros.h"
namespace google {
namespace spanner {
namespace emulator {
namespace backend {
using ::zetasql::JSONValueRef;
using JSON = ::nlohmann::json;
using ::google::spanner::v1::TypeAnnotationCode;
using ::postgres_translator::spangres::datatypes::SpannerExtendedType;
static constexpr absl::string_view kInsert = "INSERT";
static constexpr absl::string_view kUpdate = "UPDATE";
static constexpr absl::string_view kDelete = "DELETE";
static constexpr absl::string_view kMinimumValidJson = "{}";
static constexpr absl::string_view kArray = "ARRAY";
absl::flat_hash_map<const Table*, std::vector<const ChangeStream*>>
RetrieveTableWithTrackedChangeStreams(const Schema* schema) {
absl::flat_hash_map<const Table*, std::vector<const ChangeStream*>>
table_with_tracked_change_streams;
for (const ChangeStream* change_stream : schema->change_streams()) {
absl::flat_hash_map<std::string, std::vector<std::string>>
tracked_tables_columns = change_stream->tracked_tables_columns();
for (const auto& [table, columns] : tracked_tables_columns) {
table_with_tracked_change_streams[schema->FindTable(table)].emplace_back(
change_stream);
}
}
return table_with_tracked_change_streams;
}
absl::StatusOr<zetasql::Value> RetrieveChangeStreamWithPartitionToken(
ReadOnlyStore* store, const ChangeStream* change_stream) {
std::vector<const Column*> read_columns = {
change_stream->change_stream_partition_table()->FindColumn(
"partition_token"),
change_stream->change_stream_partition_table()->FindColumn("end_time")};
ZETASQL_ASSIGN_OR_RETURN(std::unique_ptr<StorageIterator> itr,
store->Read(change_stream->change_stream_partition_table(),
KeyRange::All(), read_columns));
std::vector<std::string> active_partition_tokens;
while (itr->Next()) {
// Find active partitions by filtering partitions with the end_time equal
// to the default null end_timestamp. Stale partitions' end_timestamps are
// set to the transaction commit timestamp.
if (itr->ColumnValue(1).is_null()) {
active_partition_tokens.push_back(itr->ColumnValue(0).string_value());
}
}
std::sort(active_partition_tokens.begin(), active_partition_tokens.end());
return zetasql::Value::String(active_partition_tokens[0]);
}
bool IsPrimaryKey(const Table* table, const Column* column) {
bool is_primary_key = false;
for (const KeyColumn* pk : table->primary_key()) {
if (pk->column()->Name() == column->Name()) {
is_primary_key = true;
break;
}
}
return is_primary_key;
}
int64_t GetOrdinalPosition(const Table* table, const Column* column) {
int64_t ordinal_pos = 1;
for (const Column* col : table->columns()) {
if (col->Name() == column->Name()) {
break;
}
ordinal_pos++;
}
return ordinal_pos;
}
// Compare if the list of non-key columns tracked by the change stream in this
// writeOp is the same as that from last mod group.
bool CheckIfNonKeyColumnsRemainSame(std::vector<const Column*> op_columns,
ModGroup last_mod_group, const Table* table,
const ChangeStream* change_stream) {
std::vector<const Column*> op_non_key_columns_tracked_by_change_stream;
for (const Column* column : op_columns) {
if (column->FindChangeStream(change_stream->Name())) {
op_non_key_columns_tracked_by_change_stream.push_back(column);
}
}
bool same_non_pk_columns =
op_non_key_columns_tracked_by_change_stream.size() ==
last_mod_group.non_key_column_names.size();
if (same_non_pk_columns) {
for (const Column* column : op_non_key_columns_tracked_by_change_stream) {
if (!last_mod_group.non_key_column_names.contains(column->Name())) {
same_non_pk_columns = false;
break;
}
}
}
return same_non_pk_columns;
}
std::string ToFragmentIdString(int64_t record_sequence) {
return absl::StrFormat("%08d", record_sequence);
}
// Mods inside one DataChangeRecord have the same set of mod type, user table,
// tracked non-key columns, and change stream.
DataChangeRecord BuildDataChangeRecord(
std::string tracked_table_name, std::string value_capture_type,
const ChangeStream* change_stream, TransactionID transaction_id,
int64_t record_sequence_number,
absl::flat_hash_map<const ChangeStream*, ModGroup>*
last_mod_group_by_change_stream) {
std::vector<ColumnType> column_types =
(*last_mod_group_by_change_stream)[change_stream].column_types;
std::string record_sequence = ToFragmentIdString(record_sequence_number);
DataChangeRecord record{
(*last_mod_group_by_change_stream)[change_stream].partition_token_str,
zetasql::Value::Timestamp(kCommitTimestampValueSentinel),
std::to_string(transaction_id), record_sequence, false,
tracked_table_name, column_types,
(*last_mod_group_by_change_stream)[change_stream].mods,
(*last_mod_group_by_change_stream)[change_stream].mod_type,
value_capture_type,
-1, // number_of_records_in_transaction will be reset after processing
// all mods in one transaction
1, "", false};
return record;
}
absl::StatusOr<ValueList> RetrieveExistingValues(
const Table* table, std::vector<std::string> non_key_columns,
const Key& key, ReadOnlyStore* store) {
std::vector<const Column*> columns;
columns.reserve(non_key_columns.size());
for (const std::string& col : non_key_columns) {
columns.push_back(table->FindColumn(col));
}
ZETASQL_ASSIGN_OR_RETURN(ValueList values, store->ReadCommitted(table, key, columns));
return values;
}
std::vector<const Column*> GetColumnsForDataChangeRecord(
std::string value_capture_type, absl::string_view mod_type,
const Table* tracked_table,
std::vector<const Column*> modified_tracked_columns,
const ChangeStream* change_stream) {
std::vector<const Column*> columns_for_data_change_record;
// For updates in OLD_AND_NEW_VALUES or NEW_VALUES, we only need the modified
// tracking columns since tracked un-modified columns will not be recorded.
if (mod_type == kUpdate &&
(value_capture_type == kChangeStreamValueCaptureTypeDefault ||
value_capture_type == kChangeStreamValueCaptureTypeNewValues)) {
return modified_tracked_columns;
} else {
// For (1)INSERT for all value capture types (2) DELETE for all value
// capture types or (3) UPDATE for NEW_ROW and NEW_ROW_AND_OLD_VALUES,
// column_types should also contain all tracked but not populated columns in
// the table.
for (auto column : tracked_table->columns()) {
if (column->FindChangeStream(change_stream->Name()) ||
IsPrimaryKey(tracked_table, column)) {
columns_for_data_change_record.push_back(column);
}
}
return columns_for_data_change_record;
}
}
// Gets the old values for the recorded columns in a data change record.
absl::StatusOr<std::vector<zetasql::Value>> GetOldValuesForDataChangeRecord(
absl::string_view value_capture_type, absl::string_view mod_type,
const Table* tracked_table, const Key& key, ReadOnlyStore* store,
const std::vector<std::string>& non_key_cols,
const absl::flat_hash_set<std::string>& modified_tracked_column_names) {
// For inserts in all value capture types or al mod types in NEW_ROW and
// NEW_VALUES, we don't need the old values.
if ((value_capture_type != kChangeStreamValueCaptureTypeDefault &&
value_capture_type != kChangeStreamValueCaptureTypeNewRowOldValues) ||
mod_type == kInsert) {
return std::vector<zetasql::Value>();
}
ZETASQL_ASSIGN_OR_RETURN(
std::vector<zetasql::Value> old_values,
RetrieveExistingValues(tracked_table, non_key_cols, key, store));
if (value_capture_type == kChangeStreamValueCaptureTypeNewRowOldValues &&
mod_type == kUpdate) {
for (int i = 0; i < old_values.size(); i++) {
if (!modified_tracked_column_names.contains(non_key_cols[i])) {
old_values[i] = zetasql::Value();
}
}
}
return old_values;
}
// Gets the new values for the recorded columns in a data change record.
absl::StatusOr<std::vector<zetasql::Value>> GetNewValuesForDataChangeRecord(
absl::string_view value_capture_type, absl::string_view mod_type,
const Table* tracked_table,
const std::vector<std::string> modified_tracked_columns_names,
const std::vector<zetasql::Value>& modified_tracked_values,
const Key& key, ReadOnlyStore* store,
std::vector<const Column*> tracked_columns) {
std::vector<zetasql::Value> new_values_for_tracked_cols;
if (mod_type == kDelete ||
(mod_type == kUpdate &&
(value_capture_type == kChangeStreamValueCaptureTypeDefault ||
value_capture_type == kChangeStreamValueCaptureTypeNewValues))) {
return modified_tracked_values;
}
// Store modified tracked columns and values into a map
absl::flat_hash_map<std::string, zetasql::Value> populated_col_to_value;
for (int i = 0; i < modified_tracked_columns_names.size(); i++) {
populated_col_to_value[modified_tracked_columns_names[i]] =
modified_tracked_values[i];
}
if (mod_type == kInsert) {
for (const Column* col : tracked_columns) {
new_values_for_tracked_cols.push_back(
populated_col_to_value.contains(col->Name())
? populated_col_to_value[col->Name()]
: zetasql::Value::NullString());
}
return new_values_for_tracked_cols;
}
// Find unpopulated columns
std::vector<std::string> unpopulated_cols;
for (const Column* col : tracked_columns) {
if (!IsPrimaryKey(tracked_table, col) &&
!populated_col_to_value.contains(col->Name())) {
unpopulated_cols.push_back(col->Name());
}
}
// For UPDATES with NEW_ROW, Read for the existing values for the
// unpopulated columns from the user table
ZETASQL_ASSIGN_OR_RETURN(
std::vector<zetasql::Value>
existing_values_for_tracked_unpopulated_cols,
RetrieveExistingValues(tracked_table, unpopulated_cols, key, store));
absl::flat_hash_map<std::string, zetasql::Value> unpopulated_col_to_value;
for (int i = 0; i < unpopulated_cols.size(); i++) {
unpopulated_col_to_value[unpopulated_cols[i]] =
existing_values_for_tracked_unpopulated_cols[i];
}
// Merge the populated values and old values together in order
for (const Column* col : tracked_columns) {
new_values_for_tracked_cols.push_back(
populated_col_to_value.contains(col->Name())
? populated_col_to_value[col->Name()]
: unpopulated_col_to_value[col->Name()]);
}
return new_values_for_tracked_cols;
}
// Accumulate tracked column types and values for same DataChangeRecord
absl::Status LogTableMod(
const Key& key, std::vector<const Column*> modified_tracked_columns,
const std::vector<zetasql::Value>& modified_tracked_values,
const Table* tracked_table, const ChangeStream* change_stream,
absl::string_view mod_type, zetasql::Value partition_token,
absl::flat_hash_map<const ChangeStream*, std::vector<DataChangeRecord>>*
data_change_records_in_transaction_by_change_stream,
TransactionID transaction_id,
absl::flat_hash_map<const ChangeStream*, ModGroup>*
last_mod_group_by_change_stream,
ReadOnlyStore* store) {
std::string value_capture_type =
change_stream->value_capture_type().has_value()
? change_stream->value_capture_type().value()
: std::string(kChangeStreamValueCaptureTypeDefault);
std::vector<const Column*> tracked_columns =
GetColumnsForDataChangeRecord(value_capture_type, mod_type, tracked_table,
modified_tracked_columns, change_stream);
if (last_mod_group_by_change_stream->contains(change_stream)) {
ModGroup& last_mod_group =
(*last_mod_group_by_change_stream)[change_stream];
bool same_non_pk_columns = CheckIfNonKeyColumnsRemainSame(
tracked_columns, last_mod_group, tracked_table, change_stream);
if (last_mod_group.mod_type != mod_type ||
last_mod_group.table_name != tracked_table->Name() ||
!same_non_pk_columns) {
DataChangeRecord record = BuildDataChangeRecord(
last_mod_group.table_name, value_capture_type, change_stream,
transaction_id,
(*data_change_records_in_transaction_by_change_stream)[change_stream]
.size(),
last_mod_group_by_change_stream);
last_mod_group_by_change_stream->erase(change_stream);
(*data_change_records_in_transaction_by_change_stream)[change_stream]
.push_back(record);
}
}
std::vector<std::string> modified_tracked_column_names;
modified_tracked_column_names.reserve(modified_tracked_columns.size());
for (const Column* column : modified_tracked_columns) {
modified_tracked_column_names.push_back(column->Name());
}
ZETASQL_ASSIGN_OR_RETURN(std::vector<zetasql::Value> updated_values,
GetNewValuesForDataChangeRecord(
value_capture_type, mod_type, tracked_table,
modified_tracked_column_names, modified_tracked_values,
key, store, tracked_columns));
std::vector<zetasql::Value> new_values_for_tracked_cols;
std::vector<zetasql::Value> old_values_for_tracked_cols;
std::vector<ColumnType> column_types;
std::vector<std::string> non_key_cols;
for (int i = 0; i < tracked_columns.size(); ++i) {
const Column* column = tracked_columns[i];
bool is_primary_key = IsPrimaryKey(tracked_table, column);
int64_t ordinal_position = GetOrdinalPosition(tracked_table, column);
ColumnType column_type{column->Name(), column->GetType(), is_primary_key,
ordinal_position};
column_types.push_back(column_type);
if (!is_primary_key) {
non_key_cols.push_back(column->Name());
if (mod_type != kDelete) {
new_values_for_tracked_cols.push_back(updated_values[i]);
}
}
}
ZETASQL_ASSIGN_OR_RETURN(
old_values_for_tracked_cols,
GetOldValuesForDataChangeRecord(
value_capture_type, mod_type, tracked_table, key, store, non_key_cols,
absl::flat_hash_set<std::string>{
modified_tracked_column_names.begin(),
modified_tracked_column_names.end()}));
std::sort(
column_types.begin(), column_types.end(),
[](const ColumnType& col_type_a, const ColumnType& col_type_b) {
return (col_type_a.ordinal_position < col_type_b.ordinal_position);
});
if (!new_values_for_tracked_cols.empty() || mod_type != kUpdate) {
if (!last_mod_group_by_change_stream->contains(change_stream)) {
(*last_mod_group_by_change_stream)[change_stream] =
ModGroup{.table_name = tracked_table->Name(),
.mod_type = mod_type,
.non_key_column_names = {},
.column_types = {},
.mods = {},
.partition_token_str = partition_token};
}
(*last_mod_group_by_change_stream)[change_stream].table_name =
tracked_table->Name();
(*last_mod_group_by_change_stream)[change_stream].non_key_column_names = {
non_key_cols.begin(), non_key_cols.end()};
(*last_mod_group_by_change_stream)[change_stream].column_types = {
column_types.begin(), column_types.end()};
Mod mod{tracked_table->primary_key(), non_key_cols, key.column_values(),
new_values_for_tracked_cols, old_values_for_tracked_cols};
(*last_mod_group_by_change_stream)[change_stream].mods.push_back(mod);
}
return absl::OkStatus();
}
std::pair<std::vector<const Column*>, std::vector<zetasql::Value>>
GetTrackedColumnsAndValues(std::vector<const Column*> columns,
std::vector<zetasql::Value> values,
const ChangeStream* change_stream,
const Table* table) {
std::vector<const Column*> tracked_columns;
std::vector<zetasql::Value> tracked_values;
for (int i = 0; i < columns.size(); i++) {
auto column = columns[i];
if (column->FindChangeStream(change_stream->Name()) ||
IsPrimaryKey(table, column)) {
tracked_columns.push_back(column);
tracked_values.push_back(values[i]);
}
}
return std::make_pair(tracked_columns, tracked_values);
}
absl::Status LogTableMod(
WriteOp op, const ChangeStream* change_stream,
zetasql::Value partition_token,
absl::flat_hash_map<const ChangeStream*, std::vector<DataChangeRecord>>*
data_change_records_in_transaction_by_change_stream,
TransactionID transaction_id,
absl::flat_hash_map<const ChangeStream*, ModGroup>*
last_mod_group_by_change_stream,
ReadOnlyStore* store) {
ZETASQL_RETURN_IF_ERROR(std::visit(
overloaded{
[&](const InsertOp& op) -> absl::Status {
if (change_stream->exclude_insert().has_value() &&
change_stream->exclude_insert().value()) {
return absl::OkStatus();
}
std::pair<std::vector<const Column*>, std::vector<zetasql::Value>>
tracked_columns_and_values = GetTrackedColumnsAndValues(
op.columns, op.values, change_stream, op.table);
ZETASQL_RETURN_IF_ERROR(LogTableMod(
op.key, tracked_columns_and_values.first,
tracked_columns_and_values.second, op.table, change_stream,
kInsert, partition_token,
data_change_records_in_transaction_by_change_stream,
transaction_id, last_mod_group_by_change_stream, store));
return absl::OkStatus();
},
[&](const UpdateOp& op) -> absl::Status {
if (change_stream->exclude_update().has_value() &&
change_stream->exclude_update().value()) {
return absl::OkStatus();
}
std::pair<std::vector<const Column*>, std::vector<zetasql::Value>>
tracked_columns_and_values = GetTrackedColumnsAndValues(
op.columns, op.values, change_stream, op.table);
ZETASQL_RETURN_IF_ERROR(LogTableMod(
op.key, tracked_columns_and_values.first,
tracked_columns_and_values.second, op.table, change_stream,
kUpdate, partition_token,
data_change_records_in_transaction_by_change_stream,
transaction_id, last_mod_group_by_change_stream, store));
return absl::OkStatus();
},
[&](const DeleteOp& op) -> absl::Status {
if (change_stream->exclude_delete().has_value() &&
change_stream->exclude_delete().value()) {
return absl::OkStatus();
}
std::vector<const Column*> columns;
for (const KeyColumn* pk : op.table->primary_key()) {
columns.push_back(pk->column());
}
ZETASQL_RETURN_IF_ERROR(LogTableMod(
op.key, columns, {}, op.table, change_stream, kDelete,
partition_token,
data_change_records_in_transaction_by_change_stream,
transaction_id, last_mod_group_by_change_stream, store));
return absl::OkStatus();
},
},
op));
return absl::OkStatus();
}
std::string GsqlTypeToSpannerType(const zetasql::Type* type) {
JSON type_json;
if (type->IsArray()) {
type_json["code"] = kArray;
}
std::string element_type_code;
const zetasql::Type* element_type =
type->IsArray() ? type->AsArray()->element_type() : type;
if (element_type->kind() == zetasql::TYPE_EXTENDED) {
std::string element_annotation_code;
auto type_code =
static_cast<const SpannerExtendedType*>(element_type)->code();
if (type_code == TypeAnnotationCode::PG_JSONB) {
element_type_code = "JSON";
element_annotation_code = "PG_JSONB";
} else if (type_code == TypeAnnotationCode::PG_NUMERIC) {
element_type_code = "NUMERIC";
element_annotation_code = "PG_NUMERIC";
}
type->IsArray() ? type_json["array_element_type"]["type_annotation"] =
element_annotation_code
: type_json["type_annotation"] = element_annotation_code;
} else {
element_type_code = element_type->TypeName(zetasql::PRODUCT_EXTERNAL,
/*use_external_float32=*/true);
}
type->IsArray() ? type_json["array_element_type"]["code"] = element_type_code
: type_json["code"] = element_type_code;
return type_json.dump();
}
void CloudValueToJSONValue(const zetasql::Value value, JSONValueRef& ref) {
switch (value.type_kind()) {
case zetasql::TYPE_DOUBLE: {
ref.SetDouble(value.double_value());
break;
}
case zetasql::TYPE_FLOAT: {
ref.SetDouble(static_cast<double>(value.float_value()));
break;
}
case zetasql::TYPE_STRING: {
ref.SetString(value.string_value());
break;
}
case zetasql::TYPE_BOOL: {
ref.SetBoolean(value.bool_value());
break;
}
case zetasql::TYPE_DATE: {
ref.SetString(value.Format(false));
break;
}
case zetasql::TYPE_TIMESTAMP: {
ref.SetString(absl::FormatTime("%Y-%m-%d%ET%H:%M:%SZ", value.ToTime(),
absl::UTCTimeZone()));
break;
}
case zetasql::TYPE_INT64: {
ref.SetString(value.GetSQLLiteral());
break;
}
case zetasql::TYPE_NUMERIC: {
ref.SetString(value.Format(false));
break;
}
case zetasql::TYPE_ARRAY: {
if (value.num_elements() == 0) {
ref.SetToEmptyArray();
} else {
for (int i = 0; i < value.num_elements(); i++) {
JSONValueRef element_ref = ref.GetArrayElement(i);
CloudValueToJSONValue(value.element(i), element_ref);
}
}
break;
}
case zetasql::TYPE_BYTES: {
ref.SetString(google::cloud::spanner_internal::BytesToBase64(
google::cloud::spanner::Bytes(value.bytes_value())));
break;
}
case zetasql::TYPE_JSON: {
ref.SetString(value.Format(false));
break;
}
case zetasql::TypeKind::TYPE_EXTENDED: {
ref.SetString(value.Format(false));
break;
}
default: {
break;
}
}
}
std::string CloudListValueToJSONString(
std::vector<std::string> col_names,
const std::vector<zetasql::Value> mod_values) {
zetasql::JSONValue json_value;
JSONValueRef ref = json_value.GetRef();
for (int i = 0; i < col_names.size(); i++) {
// Skip invalid old values since they belong to unmodified columns in
// NEW_ROW_AND_OLD_VALUES mode
if (!mod_values[i].is_valid()) continue;
JSONValueRef col_ref = ref.GetMember(col_names[i]);
if (mod_values[i].is_null()) {
col_ref.SetNull();
} else {
CloudValueToJSONValue(mod_values[i], col_ref);
}
}
return ref.ToString();
}
absl::StatusOr<Key> ComputeChangeStreamDataTableKey(
zetasql::Value partition_token_str, zetasql::Value commit_timestamp,
std::string record_sequence, std::string server_transaction_id,
std::string table_name) {
Key key;
key.AddColumn(partition_token_str, false);
key.AddColumn(commit_timestamp, false);
key.AddColumn(zetasql::Value::String(server_transaction_id), false);
key.AddColumn(zetasql::Value::String(record_sequence), false);
const int64_t key_size = key.LogicalSizeInBytes();
if (key_size > limits::kMaxKeySizeBytes) {
return error::KeyTooLarge(table_name, key_size, limits::kMaxKeySizeBytes);
}
return key;
}
absl::StatusOr<WriteOp> ConvertDataChangeRecordToWriteOp(
const ChangeStream* change_stream, DataChangeRecord record,
std::vector<const Column*> columns) {
// Compute change_stream_data_table key
ZETASQL_ASSIGN_OR_RETURN(Key change_stream_data_table_key,
ComputeChangeStreamDataTableKey(
record.partition_token, record.commit_timestamp,
record.server_transaction_id, record.record_sequence,
change_stream->change_stream_data_table()->Name()));
std::vector<zetasql::Value> values;
values.push_back(record.partition_token);
values.push_back(record.commit_timestamp);
values.push_back(zetasql::Value::String(record.server_transaction_id));
values.push_back(zetasql::Value::String(record.record_sequence));
values.push_back(zetasql::Value::Bool(
record.is_last_record_in_transaction_in_partition));
values.push_back(zetasql::Value::String(record.tracked_table_name));
std::vector<zetasql::Value> column_types_name;
std::vector<zetasql::Value> column_types_type;
std::vector<zetasql::Value> column_types_is_primary_key;
std::vector<zetasql::Value> column_types_ordinal_position;
for (const ColumnType& column_type : record.column_types) {
column_types_name.push_back(zetasql::Value::String(column_type.name));
column_types_type.push_back(
zetasql::Value::String(GsqlTypeToSpannerType(column_type.type)));
column_types_is_primary_key.push_back(
zetasql::Value::Bool(column_type.is_primary_key));
column_types_ordinal_position.push_back(
zetasql::Value::Int64(column_type.ordinal_position));
}
values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
column_types_name));
values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
column_types_type));
values.push_back(zetasql::values::Array(zetasql::types::BoolArrayType(),
column_types_is_primary_key));
values.push_back(zetasql::values::Array(zetasql::types::Int64ArrayType(),
column_types_ordinal_position));
std::vector<zetasql::Value> mods_keys;
std::vector<zetasql::Value> mods_new_values;
std::vector<zetasql::Value> mods_old_values;
zetasql::JSONValue keys_json;
JSONValueRef ref = keys_json.GetRef();
for (const Mod& mod : record.mods) {
for (int i = 0; i < mod.key_columns.size(); ++i) {
JSONValueRef key_col_ref =
ref.GetMember(mod.key_columns[i]->column()->Name());
// In theory, key values can't be null but double check to avoid any
// potential crash
if (mod.keys[i].is_null()) {
key_col_ref.SetNull();
} else {
CloudValueToJSONValue(mod.keys[i], key_col_ref);
}
}
mods_keys.push_back(zetasql::Value::String(ref.ToString()));
if (mod.new_values.empty()) {
mods_new_values.push_back(zetasql::Value::String(kMinimumValidJson));
} else {
std::string new_values_json_str =
CloudListValueToJSONString(mod.non_key_columns, mod.new_values);
mods_new_values.push_back(zetasql::Value::String(new_values_json_str));
}
if (mod.old_values.empty()) {
mods_old_values.push_back(zetasql::Value::String(kMinimumValidJson));
} else {
std::string old_values_json_str =
CloudListValueToJSONString(mod.non_key_columns, mod.old_values);
mods_old_values.push_back(zetasql::Value::String(old_values_json_str));
}
}
values.push_back(
zetasql::values::Array(zetasql::types::StringArrayType(), mods_keys));
values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
mods_new_values));
values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
mods_old_values));
values.push_back(zetasql::Value::String(record.mod_type));
values.push_back(zetasql::Value::String(record.value_capture_type));
values.push_back(
zetasql::Value::Int64(record.number_of_records_in_transaction));
values.push_back(
zetasql::Value::Int64(record.number_of_partitions_in_transaction));
values.push_back(zetasql::Value::String(record.transaction_tag));
values.push_back(zetasql::Value::Bool(record.is_system_transaction));
return InsertOp{change_stream->change_stream_data_table(),
change_stream_data_table_key, columns, values};
}
// Set number_of_records_in_transaction and build the WriteOp for
// change_stream_data_table
std::vector<WriteOp> BuildMutation(
absl::flat_hash_map<const ChangeStream*, std::vector<DataChangeRecord>>*
data_change_records_in_transaction_by_change_stream,
TransactionID transaction_id,
absl::flat_hash_map<const ChangeStream*, ModGroup>*
last_mod_group_by_change_stream) {
std::vector<WriteOp> write_ops;
// After the last user WriteOp passed into this buffer, there may be grouped
// column types and mods by change streams that haven't been converted to
// DataChangeRecord. Build them into DataChangeRecords before setting
// is_last_record_in_transaction_in_partition and
// number_of_records_in_transaction.
for (auto& [change_stream, mod_group] : *last_mod_group_by_change_stream) {
DataChangeRecord record = BuildDataChangeRecord(
mod_group.table_name,
change_stream->value_capture_type().has_value()
? change_stream->value_capture_type().value()
: std::string(kChangeStreamValueCaptureTypeDefault),
change_stream, transaction_id,
(*data_change_records_in_transaction_by_change_stream)[change_stream]
.size(),
last_mod_group_by_change_stream);
if (!data_change_records_in_transaction_by_change_stream->contains(
change_stream)) {
(*data_change_records_in_transaction_by_change_stream)[change_stream] =
std::vector<DataChangeRecord>();
}
(*data_change_records_in_transaction_by_change_stream)[change_stream]
.push_back(record);
}
for (auto& [change_stream, records] :
*data_change_records_in_transaction_by_change_stream) {
std::vector<const Column*> columns;
// Each change_stream has one change_stream_data_table
for (const Column* column :
change_stream->change_stream_data_table()->columns()) {
columns.push_back(column);
}
int64_t number_of_records_in_transaction = records.size();
(*data_change_records_in_transaction_by_change_stream)
[change_stream][number_of_records_in_transaction - 1]
.is_last_record_in_transaction_in_partition = true;
for (DataChangeRecord record : records) {
record.number_of_records_in_transaction =
number_of_records_in_transaction;
write_ops.push_back(
ConvertDataChangeRecordToWriteOp(change_stream, record, columns)
.value());
}
}
return write_ops;
}
absl::StatusOr<std::vector<WriteOp>> BuildChangeStreamWriteOps(
const Schema* schema, std::vector<WriteOp> buffered_write_ops,
ReadOnlyStore* store, TransactionID transaction_id) {
// Map for change streams and their partition tokens within the transaction.
absl::flat_hash_map<const ChangeStream*, zetasql::Value>
change_stream_with_partition_token;
// Map for tables and the change streams tracking the tables or columns
// included in the tables.
absl::flat_hash_map<const Table*, std::vector<const ChangeStream*>>
table_with_tracked_change_streams =
RetrieveTableWithTrackedChangeStreams(schema);
// Map for change streams and their DataChangeRecords
absl::flat_hash_map<const ChangeStream*, std::vector<DataChangeRecord>>
data_change_records_in_transaction_by_change_stream;
// Map for change streams and their ModGroups
absl::flat_hash_map<const ChangeStream*, ModGroup>
last_mod_group_by_change_stream;
for (const auto& write_op : buffered_write_ops) {
const Table* table = TableOf(write_op);
for (const ChangeStream* change_stream :
table_with_tracked_change_streams[table]) {
if (!change_stream_with_partition_token.contains(change_stream)) {
change_stream_with_partition_token[change_stream] =
RetrieveChangeStreamWithPartitionToken(store, change_stream)
.value();
}
ZETASQL_RETURN_IF_ERROR(
LogTableMod(write_op, change_stream,
change_stream_with_partition_token[change_stream],
&data_change_records_in_transaction_by_change_stream,
transaction_id, &last_mod_group_by_change_stream, store));
}
}
std::vector<WriteOp> write_ops =
BuildMutation(&data_change_records_in_transaction_by_change_stream,
transaction_id, &last_mod_group_by_change_stream);
return write_ops;
}
} // namespace backend
} // namespace emulator
} // namespace spanner
} // namespace google