absl::StatusOr ConvertDataChangeRecordToWriteOp()

in backend/actions/change_stream.cc [637:725]


absl::StatusOr<WriteOp> ConvertDataChangeRecordToWriteOp(
    const ChangeStream* change_stream, DataChangeRecord record,
    std::vector<const Column*> columns) {
  // Compute change_stream_data_table key
  ZETASQL_ASSIGN_OR_RETURN(Key change_stream_data_table_key,
                   ComputeChangeStreamDataTableKey(
                       record.partition_token, record.commit_timestamp,
                       record.server_transaction_id, record.record_sequence,
                       change_stream->change_stream_data_table()->Name()));
  std::vector<zetasql::Value> values;
  values.push_back(record.partition_token);
  values.push_back(record.commit_timestamp);
  values.push_back(zetasql::Value::String(record.server_transaction_id));
  values.push_back(zetasql::Value::String(record.record_sequence));
  values.push_back(zetasql::Value::Bool(
      record.is_last_record_in_transaction_in_partition));
  values.push_back(zetasql::Value::String(record.tracked_table_name));
  std::vector<zetasql::Value> column_types_name;
  std::vector<zetasql::Value> column_types_type;
  std::vector<zetasql::Value> column_types_is_primary_key;
  std::vector<zetasql::Value> column_types_ordinal_position;
  for (const ColumnType& column_type : record.column_types) {
    column_types_name.push_back(zetasql::Value::String(column_type.name));
    column_types_type.push_back(
        zetasql::Value::String(GsqlTypeToSpannerType(column_type.type)));
    column_types_is_primary_key.push_back(
        zetasql::Value::Bool(column_type.is_primary_key));
    column_types_ordinal_position.push_back(
        zetasql::Value::Int64(column_type.ordinal_position));
  }
  values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
                                            column_types_name));
  values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
                                            column_types_type));
  values.push_back(zetasql::values::Array(zetasql::types::BoolArrayType(),
                                            column_types_is_primary_key));
  values.push_back(zetasql::values::Array(zetasql::types::Int64ArrayType(),
                                            column_types_ordinal_position));
  std::vector<zetasql::Value> mods_keys;
  std::vector<zetasql::Value> mods_new_values;
  std::vector<zetasql::Value> mods_old_values;
  zetasql::JSONValue keys_json;
  JSONValueRef ref = keys_json.GetRef();
  for (const Mod& mod : record.mods) {
    for (int i = 0; i < mod.key_columns.size(); ++i) {
      JSONValueRef key_col_ref =
          ref.GetMember(mod.key_columns[i]->column()->Name());
      // In theory, key values can't be null but double check to avoid any
      // potential crash
      if (mod.keys[i].is_null()) {
        key_col_ref.SetNull();
      } else {
        CloudValueToJSONValue(mod.keys[i], key_col_ref);
      }
    }
    mods_keys.push_back(zetasql::Value::String(ref.ToString()));

    if (mod.new_values.empty()) {
      mods_new_values.push_back(zetasql::Value::String(kMinimumValidJson));
    } else {
      std::string new_values_json_str =
          CloudListValueToJSONString(mod.non_key_columns, mod.new_values);
      mods_new_values.push_back(zetasql::Value::String(new_values_json_str));
    }
    if (mod.old_values.empty()) {
      mods_old_values.push_back(zetasql::Value::String(kMinimumValidJson));
    } else {
      std::string old_values_json_str =
          CloudListValueToJSONString(mod.non_key_columns, mod.old_values);
      mods_old_values.push_back(zetasql::Value::String(old_values_json_str));
    }
  }
  values.push_back(
      zetasql::values::Array(zetasql::types::StringArrayType(), mods_keys));
  values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
                                            mods_new_values));
  values.push_back(zetasql::values::Array(zetasql::types::StringArrayType(),
                                            mods_old_values));
  values.push_back(zetasql::Value::String(record.mod_type));
  values.push_back(zetasql::Value::String(record.value_capture_type));
  values.push_back(
      zetasql::Value::Int64(record.number_of_records_in_transaction));
  values.push_back(
      zetasql::Value::Int64(record.number_of_partitions_in_transaction));
  values.push_back(zetasql::Value::String(record.transaction_tag));
  values.push_back(zetasql::Value::Bool(record.is_system_transaction));
  return InsertOp{change_stream->change_stream_data_table(),
                  change_stream_data_table_key, columns, values};
}