in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [340:413]
func (btc *BigtableClient) CreateTable(ctx context.Context, data *translator.CreateTableStatementMap, schemaMappingTableName string) error {
client, ok := btc.Clients[data.Keyspace]
if !ok {
return fmt.Errorf("invalid keyspace `%s`", data.Keyspace)
}
adminClient, ok := btc.AdminClients[data.Keyspace]
if !ok {
return fmt.Errorf("invalid keyspace `%s`", data.Keyspace)
}
exists, err := btc.tableSchemaExists(ctx, client, schemaMappingTableName, data.Table)
if err != nil {
return err
}
if !exists {
btc.Logger.Info("updating table schema")
err = btc.updateTableSchema(ctx, data.Keyspace, schemaMappingTableName, data.Table, data.PrimaryKeys, data.Columns, nil)
if err != nil {
return err
}
}
var rowKeySchemaFields []bigtable.StructField
for _, key := range data.PrimaryKeys {
part, err := createBigtableRowKeyField(key.Name, data.Columns, btc.BigtableConfig.EncodeIntValuesWithBigEndian)
if err != nil {
return err
}
rowKeySchemaFields = append(rowKeySchemaFields, part)
}
columnFamilies := make(map[string]bigtable.Family)
for _, col := range data.Columns {
if utilities.IsCollectionDataType(col.Type) {
columnFamilies[col.Name] = bigtable.Family{
GCPolicy: bigtable.MaxVersionsPolicy(1),
}
}
}
columnFamilies[btc.BigtableConfig.DefaultColumnFamily] = bigtable.Family{
GCPolicy: bigtable.MaxVersionsPolicy(1),
}
btc.Logger.Info("creating bigtable table")
err = adminClient.CreateTableFromConf(ctx, &bigtable.TableConf{
TableID: data.Table,
ColumnFamilies: columnFamilies,
RowKeySchema: &bigtable.StructType{
Fields: rowKeySchemaFields,
Encoding: bigtable.StructOrderedCodeBytesEncoding{},
},
})
// ignore already exists errors - the schema mapping table is the SoT
if status.Code(err) == codes.AlreadyExists {
err = nil
}
if err != nil {
btc.Logger.Error("failed to create bigtable table", zap.Error(err))
return err
}
if exists && !data.IfNotExists {
return fmt.Errorf("cannot create table %s becauase it already exists", data.Table)
}
btc.Logger.Info("reloading schema mappings")
err = btc.reloadSchemaMappings(ctx, data.Keyspace, schemaMappingTableName)
if err != nil {
return err
}
return nil
}