func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [340:413]


func (btc *BigtableClient) CreateTable(ctx context.Context, data *translator.CreateTableStatementMap, schemaMappingTableName string) error {
	client, ok := btc.Clients[data.Keyspace]
	if !ok {
		return fmt.Errorf("invalid keyspace `%s`", data.Keyspace)
	}
	adminClient, ok := btc.AdminClients[data.Keyspace]
	if !ok {
		return fmt.Errorf("invalid keyspace `%s`", data.Keyspace)
	}

	exists, err := btc.tableSchemaExists(ctx, client, schemaMappingTableName, data.Table)
	if err != nil {
		return err
	}

	if !exists {
		btc.Logger.Info("updating table schema")
		err = btc.updateTableSchema(ctx, data.Keyspace, schemaMappingTableName, data.Table, data.PrimaryKeys, data.Columns, nil)
		if err != nil {
			return err
		}
	}

	var rowKeySchemaFields []bigtable.StructField
	for _, key := range data.PrimaryKeys {
		part, err := createBigtableRowKeyField(key.Name, data.Columns, btc.BigtableConfig.EncodeIntValuesWithBigEndian)
		if err != nil {
			return err
		}
		rowKeySchemaFields = append(rowKeySchemaFields, part)
	}

	columnFamilies := make(map[string]bigtable.Family)
	for _, col := range data.Columns {
		if utilities.IsCollectionDataType(col.Type) {
			columnFamilies[col.Name] = bigtable.Family{
				GCPolicy: bigtable.MaxVersionsPolicy(1),
			}
		}
	}
	columnFamilies[btc.BigtableConfig.DefaultColumnFamily] = bigtable.Family{
		GCPolicy: bigtable.MaxVersionsPolicy(1),
	}

	btc.Logger.Info("creating bigtable table")
	err = adminClient.CreateTableFromConf(ctx, &bigtable.TableConf{
		TableID:        data.Table,
		ColumnFamilies: columnFamilies,
		RowKeySchema: &bigtable.StructType{
			Fields:   rowKeySchemaFields,
			Encoding: bigtable.StructOrderedCodeBytesEncoding{},
		},
	})
	// ignore already exists errors - the schema mapping table is the SoT
	if status.Code(err) == codes.AlreadyExists {
		err = nil
	}
	if err != nil {
		btc.Logger.Error("failed to create bigtable table", zap.Error(err))
		return err
	}

	if exists && !data.IfNotExists {
		return fmt.Errorf("cannot create table %s becauase it already exists", data.Table)
	}

	btc.Logger.Info("reloading schema mappings")
	err = btc.reloadSchemaMappings(ctx, data.Keyspace, schemaMappingTableName)
	if err != nil {
		return err
	}

	return nil
}