func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [468:517]


func (btc *BigtableClient) updateTableSchema(ctx context.Context, keyspace string, schemaMappingTableName string, tableName string, pmks []translator.CreateTablePrimaryKeyConfig, addCols []message.ColumnMetadata, dropCols []string) error {
	client, exists := btc.Clients[keyspace]
	if !exists {
		return fmt.Errorf("invalid keyspace `%s`", keyspace)
	}

	ts := bigtable.Now()
	var muts []*bigtable.Mutation
	var rowKeys []string
	for _, col := range addCols {
		mut := bigtable.NewMutation()
		mut.Set(schemaMappingTableColumnFamily, "ColumnName", ts, []byte(col.Name))
		mut.Set(schemaMappingTableColumnFamily, "ColumnType", ts, []byte(col.Type.String()))
		isCollection := utilities.IsCollectionDataType(col.Type)
		mut.Set(schemaMappingTableColumnFamily, "IsCollection", ts, []byte(strconv.FormatBool(isCollection)))
		pmkIndex := slices.IndexFunc(pmks, func(c translator.CreateTablePrimaryKeyConfig) bool {
			return c.Name == col.Name
		})
		mut.Set(schemaMappingTableColumnFamily, "IsPrimaryKey", ts, []byte(strconv.FormatBool(pmkIndex != -1)))
		if pmkIndex != -1 {
			pmkConfig := pmks[pmkIndex]
			mut.Set(schemaMappingTableColumnFamily, "KeyType", ts, []byte(pmkConfig.KeyType))
		} else {
			// overkill, but overwrite any previous KeyType configs which could exist if the table was recreated with different columns
			mut.Set(schemaMappingTableColumnFamily, "KeyType", ts, []byte(""))
		}
		mut.Set(schemaMappingTableColumnFamily, "PK_Precedence", ts, []byte(strconv.Itoa(pmkIndex+1)))
		mut.Set(schemaMappingTableColumnFamily, "TableName", ts, []byte(tableName))
		muts = append(muts, mut)
		rowKeys = append(rowKeys, tableName+"#"+col.Name)
	}
	// note: we only remove the column from the schema mapping table and don't actually delete any data from the data table
	for _, col := range dropCols {
		mut := bigtable.NewMutation()
		mut.DeleteRow()
		muts = append(muts, mut)
		rowKeys = append(rowKeys, tableName+"#"+col)
	}

	btc.Logger.Info("updating schema mapping table")
	table := client.Open(schemaMappingTableName)
	_, err := table.ApplyBulk(ctx, rowKeys, muts)

	if err != nil {
		btc.Logger.Error("update schema mapping table failed")
		return err
	}

	return nil
}