in backend/actions/change_stream.cc [637:725]
absl::StatusOr<WriteOp> ConvertDataChangeRecordToWriteOp(
const ChangeStream* change_stream, DataChangeRecord record,
std::vector<const Column*> columns) {
// Compute change_stream_data_table key
ZETASQL_ASSIGN_OR_RETURN(Key change_stream_data_table_key,
ComputeChangeStreamDataTableKey(
record.partition_token, record.commit_timestamp,
record.server_transaction_id, record.record_sequence,
change_stream->change_stream_data_table()->Name()));
std::vector<zetasql::Value> values;
values.push_back(record.partition_token);
values.push_back(record.commit_timestamp);
values.push_back(zetasql::Value::String(record.server_transaction_id));
values.push_back(zetasql::Value::String(record.record_sequence));
values.push_back(zetasql::Value::Bool(
record.is_last_record_in_transaction_in_partition));
values.push_back(zetasql::Value::String(record.tracked_table_name));
std::vector<zetasql::Value> column_types_name;
std::vector<zetasql::Value> column_types_type;
std::vector<zetasql::Value> column_types_is_primary_key;
std::vector<zetasql::Value> column_types_ordinal_position;
for (const ColumnType& column_type : record.column_types) {
column_types_name.push_back(zetasql::Value::String(column_type.name));
column_types_type.push_back(
zetasql::Value::String(GsqlTypeToSpannerType(column_type.type)));
column_types_is_primary_key.push_back(
zetasql::Value::Bool(column_type.is_primary_key));
column_types_ordinal_position.push_back(
zetasql::Value::Int64(column_type.ordinal_position));
}
values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
column_types_name));
values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
column_types_type));
values.push_back(zetasql::values::Array(zetasql::types::BoolArrayType(),
column_types_is_primary_key));
values.push_back(zetasql::values::Array(zetasql::types::Int64ArrayType(),
column_types_ordinal_position));
std::vector<zetasql::Value> mods_keys;
std::vector<zetasql::Value> mods_new_values;
std::vector<zetasql::Value> mods_old_values;
zetasql::JSONValue keys_json;
JSONValueRef ref = keys_json.GetRef();
for (const Mod& mod : record.mods) {
for (int i = 0; i < mod.key_columns.size(); ++i) {
JSONValueRef key_col_ref =
ref.GetMember(mod.key_columns[i]->column()->Name());
// In theory, key values can't be null but double check to avoid any
// potential crash
if (mod.keys[i].is_null()) {
key_col_ref.SetNull();
} else {
CloudValueToJSONValue(mod.keys[i], key_col_ref);
}
}
mods_keys.push_back(zetasql::Value::String(ref.ToString()));
if (mod.new_values.empty()) {
mods_new_values.push_back(zetasql::Value::String(kMinimumValidJson));
} else {
std::string new_values_json_str =
CloudListValueToJSONString(mod.non_key_columns, mod.new_values);
mods_new_values.push_back(zetasql::Value::String(new_values_json_str));
}
if (mod.old_values.empty()) {
mods_old_values.push_back(zetasql::Value::String(kMinimumValidJson));
} else {
std::string old_values_json_str =
CloudListValueToJSONString(mod.non_key_columns, mod.old_values);
mods_old_values.push_back(zetasql::Value::String(old_values_json_str));
}
}
values.push_back(
zetasql::values::Array(zetasql::types::StringArrayType(), mods_keys));
values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
mods_new_values));
values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
mods_old_values));
values.push_back(zetasql::Value::String(record.mod_type));
values.push_back(zetasql::Value::String(record.value_capture_type));
values.push_back(
zetasql::Value::Int64(record.number_of_records_in_transaction));
values.push_back(
zetasql::Value::Int64(record.number_of_partitions_in_transaction));
values.push_back(zetasql::Value::String(record.transaction_tag));
values.push_back(zetasql::Value::Bool(record.is_system_transaction));
return InsertOp{change_stream->change_stream_data_table(),
change_stream_data_table_key, columns, values};
}