in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [468:517]
func (btc *BigtableClient) updateTableSchema(ctx context.Context, keyspace string, schemaMappingTableName string, tableName string, pmks []translator.CreateTablePrimaryKeyConfig, addCols []message.ColumnMetadata, dropCols []string) error {
client, exists := btc.Clients[keyspace]
if !exists {
return fmt.Errorf("invalid keyspace `%s`", keyspace)
}
ts := bigtable.Now()
var muts []*bigtable.Mutation
var rowKeys []string
for _, col := range addCols {
mut := bigtable.NewMutation()
mut.Set(schemaMappingTableColumnFamily, "ColumnName", ts, []byte(col.Name))
mut.Set(schemaMappingTableColumnFamily, "ColumnType", ts, []byte(col.Type.String()))
isCollection := utilities.IsCollectionDataType(col.Type)
mut.Set(schemaMappingTableColumnFamily, "IsCollection", ts, []byte(strconv.FormatBool(isCollection)))
pmkIndex := slices.IndexFunc(pmks, func(c translator.CreateTablePrimaryKeyConfig) bool {
return c.Name == col.Name
})
mut.Set(schemaMappingTableColumnFamily, "IsPrimaryKey", ts, []byte(strconv.FormatBool(pmkIndex != -1)))
if pmkIndex != -1 {
pmkConfig := pmks[pmkIndex]
mut.Set(schemaMappingTableColumnFamily, "KeyType", ts, []byte(pmkConfig.KeyType))
} else {
// overkill, but overwrite any previous KeyType configs which could exist if the table was recreated with different columns
mut.Set(schemaMappingTableColumnFamily, "KeyType", ts, []byte(""))
}
mut.Set(schemaMappingTableColumnFamily, "PK_Precedence", ts, []byte(strconv.Itoa(pmkIndex+1)))
mut.Set(schemaMappingTableColumnFamily, "TableName", ts, []byte(tableName))
muts = append(muts, mut)
rowKeys = append(rowKeys, tableName+"#"+col.Name)
}
// note: we only remove the column from the schema mapping table and don't actually delete any data from the data table
for _, col := range dropCols {
mut := bigtable.NewMutation()
mut.DeleteRow()
muts = append(muts, mut)
rowKeys = append(rowKeys, tableName+"#"+col)
}
btc.Logger.Info("updating schema mapping table")
table := client.Open(schemaMappingTableName)
_, err := table.ApplyBulk(ctx, rowKeys, muts)
if err != nil {
btc.Logger.Error("update schema mapping table failed")
return err
}
return nil
}