in backend/actions/change_stream.cc [320:422]
absl::Status LogTableMod(
const Key& key, std::vector<const Column*> modified_tracked_columns,
const std::vector<zetasql::Value>& modified_tracked_values,
const Table* tracked_table, const ChangeStream* change_stream,
absl::string_view mod_type, zetasql::Value partition_token,
absl::flat_hash_map<const ChangeStream*, std::vector<DataChangeRecord>>*
data_change_records_in_transaction_by_change_stream,
TransactionID transaction_id,
absl::flat_hash_map<const ChangeStream*, ModGroup>*
last_mod_group_by_change_stream,
ReadOnlyStore* store) {
std::string value_capture_type =
change_stream->value_capture_type().has_value()
? change_stream->value_capture_type().value()
: std::string(kChangeStreamValueCaptureTypeDefault);
std::vector<const Column*> tracked_columns =
GetColumnsForDataChangeRecord(value_capture_type, mod_type, tracked_table,
modified_tracked_columns, change_stream);
if (last_mod_group_by_change_stream->contains(change_stream)) {
ModGroup& last_mod_group =
(*last_mod_group_by_change_stream)[change_stream];
bool same_non_pk_columns = CheckIfNonKeyColumnsRemainSame(
tracked_columns, last_mod_group, tracked_table, change_stream);
if (last_mod_group.mod_type != mod_type ||
last_mod_group.table_name != tracked_table->Name() ||
!same_non_pk_columns) {
DataChangeRecord record = BuildDataChangeRecord(
last_mod_group.table_name, value_capture_type, change_stream,
transaction_id,
(*data_change_records_in_transaction_by_change_stream)[change_stream]
.size(),
last_mod_group_by_change_stream);
last_mod_group_by_change_stream->erase(change_stream);
(*data_change_records_in_transaction_by_change_stream)[change_stream]
.push_back(record);
}
}
std::vector<std::string> modified_tracked_column_names;
modified_tracked_column_names.reserve(modified_tracked_columns.size());
for (const Column* column : modified_tracked_columns) {
modified_tracked_column_names.push_back(column->Name());
}
ZETASQL_ASSIGN_OR_RETURN(std::vector<zetasql::Value> updated_values,
GetNewValuesForDataChangeRecord(
value_capture_type, mod_type, tracked_table,
modified_tracked_column_names, modified_tracked_values,
key, store, tracked_columns));
std::vector<zetasql::Value> new_values_for_tracked_cols;
std::vector<zetasql::Value> old_values_for_tracked_cols;
std::vector<ColumnType> column_types;
std::vector<std::string> non_key_cols;
for (int i = 0; i < tracked_columns.size(); ++i) {
const Column* column = tracked_columns[i];
bool is_primary_key = IsPrimaryKey(tracked_table, column);
int64_t ordinal_position = GetOrdinalPosition(tracked_table, column);
ColumnType column_type{column->Name(), column->GetType(), is_primary_key,
ordinal_position};
column_types.push_back(column_type);
if (!is_primary_key) {
non_key_cols.push_back(column->Name());
if (mod_type != kDelete) {
new_values_for_tracked_cols.push_back(updated_values[i]);
}
}
}
ZETASQL_ASSIGN_OR_RETURN(
old_values_for_tracked_cols,
GetOldValuesForDataChangeRecord(
value_capture_type, mod_type, tracked_table, key, store, non_key_cols,
absl::flat_hash_set<std::string>{
modified_tracked_column_names.begin(),
modified_tracked_column_names.end()}));
std::sort(
column_types.begin(), column_types.end(),
[](const ColumnType& col_type_a, const ColumnType& col_type_b) {
return (col_type_a.ordinal_position < col_type_b.ordinal_position);
});
if (!new_values_for_tracked_cols.empty() || mod_type != kUpdate) {
if (!last_mod_group_by_change_stream->contains(change_stream)) {
(*last_mod_group_by_change_stream)[change_stream] =
ModGroup{.table_name = tracked_table->Name(),
.mod_type = mod_type,
.non_key_column_names = {},
.column_types = {},
.mods = {},
.partition_token_str = partition_token};
}
(*last_mod_group_by_change_stream)[change_stream].table_name =
tracked_table->Name();
(*last_mod_group_by_change_stream)[change_stream].non_key_column_names = {
non_key_cols.begin(), non_key_cols.end()};
(*last_mod_group_by_change_stream)[change_stream].column_types = {
column_types.begin(), column_types.end()};
Mod mod{tracked_table->primary_key(), non_key_cols, key.column_values(),
new_values_for_tracked_cols, old_values_for_tracked_cols};
(*last_mod_group_by_change_stream)[change_stream].mods.push_back(mod);
}
return absl::OkStatus();
}