absl::Status LogTableMod()

in backend/actions/change_stream.cc [320:422]


absl::Status LogTableMod(
    const Key& key, std::vector<const Column*> modified_tracked_columns,
    const std::vector<zetasql::Value>& modified_tracked_values,
    const Table* tracked_table, const ChangeStream* change_stream,
    absl::string_view mod_type, zetasql::Value partition_token,
    absl::flat_hash_map<const ChangeStream*, std::vector<DataChangeRecord>>*
        data_change_records_in_transaction_by_change_stream,
    TransactionID transaction_id,
    absl::flat_hash_map<const ChangeStream*, ModGroup>*
        last_mod_group_by_change_stream,
    ReadOnlyStore* store) {
  std::string value_capture_type =
      change_stream->value_capture_type().has_value()
          ? change_stream->value_capture_type().value()
          : std::string(kChangeStreamValueCaptureTypeDefault);
  std::vector<const Column*> tracked_columns =
      GetColumnsForDataChangeRecord(value_capture_type, mod_type, tracked_table,
                                    modified_tracked_columns, change_stream);
  if (last_mod_group_by_change_stream->contains(change_stream)) {
    ModGroup& last_mod_group =
        (*last_mod_group_by_change_stream)[change_stream];
    bool same_non_pk_columns = CheckIfNonKeyColumnsRemainSame(
        tracked_columns, last_mod_group, tracked_table, change_stream);
    if (last_mod_group.mod_type != mod_type ||
        last_mod_group.table_name != tracked_table->Name() ||
        !same_non_pk_columns) {
      DataChangeRecord record = BuildDataChangeRecord(
          last_mod_group.table_name, value_capture_type, change_stream,
          transaction_id,
          (*data_change_records_in_transaction_by_change_stream)[change_stream]
              .size(),
          last_mod_group_by_change_stream);
      last_mod_group_by_change_stream->erase(change_stream);
      (*data_change_records_in_transaction_by_change_stream)[change_stream]
          .push_back(record);
    }
  }

  std::vector<std::string> modified_tracked_column_names;
  modified_tracked_column_names.reserve(modified_tracked_columns.size());
  for (const Column* column : modified_tracked_columns) {
    modified_tracked_column_names.push_back(column->Name());
  }

  ZETASQL_ASSIGN_OR_RETURN(std::vector<zetasql::Value> updated_values,
                   GetNewValuesForDataChangeRecord(
                       value_capture_type, mod_type, tracked_table,
                       modified_tracked_column_names, modified_tracked_values,
                       key, store, tracked_columns));

  std::vector<zetasql::Value> new_values_for_tracked_cols;
  std::vector<zetasql::Value> old_values_for_tracked_cols;
  std::vector<ColumnType> column_types;
  std::vector<std::string> non_key_cols;
  for (int i = 0; i < tracked_columns.size(); ++i) {
    const Column* column = tracked_columns[i];
    bool is_primary_key = IsPrimaryKey(tracked_table, column);
    int64_t ordinal_position = GetOrdinalPosition(tracked_table, column);
    ColumnType column_type{column->Name(), column->GetType(), is_primary_key,
                           ordinal_position};
    column_types.push_back(column_type);
    if (!is_primary_key) {
      non_key_cols.push_back(column->Name());
      if (mod_type != kDelete) {
        new_values_for_tracked_cols.push_back(updated_values[i]);
      }
    }
  }

  ZETASQL_ASSIGN_OR_RETURN(
      old_values_for_tracked_cols,
      GetOldValuesForDataChangeRecord(
          value_capture_type, mod_type, tracked_table, key, store, non_key_cols,
          absl::flat_hash_set<std::string>{
              modified_tracked_column_names.begin(),
              modified_tracked_column_names.end()}));
  std::sort(
      column_types.begin(), column_types.end(),
      [](const ColumnType& col_type_a, const ColumnType& col_type_b) {
        return (col_type_a.ordinal_position < col_type_b.ordinal_position);
      });
  if (!new_values_for_tracked_cols.empty() || mod_type != kUpdate) {
    if (!last_mod_group_by_change_stream->contains(change_stream)) {
      (*last_mod_group_by_change_stream)[change_stream] =
          ModGroup{.table_name = tracked_table->Name(),
                   .mod_type = mod_type,
                   .non_key_column_names = {},
                   .column_types = {},
                   .mods = {},
                   .partition_token_str = partition_token};
    }
    (*last_mod_group_by_change_stream)[change_stream].table_name =
        tracked_table->Name();
    (*last_mod_group_by_change_stream)[change_stream].non_key_column_names = {
        non_key_cols.begin(), non_key_cols.end()};
    (*last_mod_group_by_change_stream)[change_stream].column_types = {
        column_types.begin(), column_types.end()};
    Mod mod{tracked_table->primary_key(), non_key_cols, key.column_values(),
            new_values_for_tracked_cols, old_values_for_tracked_cols};
    (*last_mod_group_by_change_stream)[change_stream].mods.push_back(mod);
  }
  return absl::OkStatus();
}