in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [616:722]
func (btc *BigtableClient) GetSchemaMappingConfigs(ctx context.Context, keyspace, schemaMappingTable string) (map[string]map[string]*schemaMapping.Column, map[string][]schemaMapping.Column, error) {
// if this is the first time we're getting table configs, ensure the schema mapping table exists
if btc.SchemaMappingConfig == nil || len(btc.SchemaMappingConfig.TablesMetaData) == 0 {
err := btc.createSchemaMappingTableMaybe(ctx, keyspace, schemaMappingTable)
if err != nil {
return nil, nil, err
}
}
otelgo.AddAnnotation(ctx, fetchingSchemaMappingConfig)
client, ok := btc.Clients[keyspace]
if !ok {
return nil, nil, fmt.Errorf("invalid keySpace - `%s`", keyspace)
}
table := client.Open(schemaMappingTable)
filter := bigtable.LatestNFilter(1)
tableMetadata := make(map[string]map[string]*schemaMapping.Column)
pkMetadata := make(map[string][]schemaMapping.Column)
var readErr error
err := table.ReadRows(ctx, bigtable.InfiniteRange(""), func(row bigtable.Row) bool {
// Extract the row key and column values
var tableName, columnName, columnType, KeyType string
var isPrimaryKey, isCollection bool
var pkPrecedence int
// Extract column values
for _, item := range row[schemaMappingTableColumnFamily] {
switch item.Column {
case schemaMappingTableColumnFamily + ":TableName":
tableName = string(item.Value)
case schemaMappingTableColumnFamily + ":ColumnName":
columnName = string(item.Value)
case schemaMappingTableColumnFamily + ":ColumnType":
columnType = string(item.Value)
case schemaMappingTableColumnFamily + ":IsPrimaryKey":
isPrimaryKey = string(item.Value) == "true"
case schemaMappingTableColumnFamily + ":PK_Precedence":
pkPrecedence, readErr = strconv.Atoi(string(item.Value))
if readErr != nil {
return false
}
case schemaMappingTableColumnFamily + ":IsCollection":
isCollection = string(item.Value) == "true"
case schemaMappingTableColumnFamily + ":KeyType":
KeyType = string(item.Value)
}
}
cqlType, err := utilities.GetCassandraColumnType(columnType)
if err != nil {
readErr = err
return false
}
columnMetadata := message.ColumnMetadata{
Keyspace: keyspace,
Table: tableName,
Name: columnName,
Type: cqlType,
}
// Create a new column struct
column := schemaMapping.Column{
ColumnName: columnName,
ColumnType: columnType,
IsPrimaryKey: isPrimaryKey,
PkPrecedence: pkPrecedence,
IsCollection: isCollection,
Metadata: columnMetadata,
KeyType: KeyType,
}
if _, exists := tableMetadata[tableName]; !exists {
tableMetadata[tableName] = make(map[string]*schemaMapping.Column)
}
if _, exists := pkMetadata[tableName]; !exists {
var pkSlice []schemaMapping.Column
pkMetadata[tableName] = pkSlice
}
// todo: this index is probably wrong - this index will end up being how the columns for this table are sorted in bigtable (not how they were originally written). This means that if a column is added, it could change the index of other tables if it's name is sorted before existing columns (example: "age" vs. ["name", "id", "email"])
column.Metadata.Index = int32(len(tableMetadata[tableName]))
tableMetadata[tableName][column.ColumnName] = &column
if column.IsPrimaryKey {
pkSlice := pkMetadata[tableName]
pkSlice = append(pkSlice, column)
pkMetadata[tableName] = pkSlice
}
return true
}, bigtable.RowFilter(filter))
// combine errors for simpler error handling
if err == nil {
err = readErr
}
if err != nil {
btc.Logger.Error("Failed to read rows from Bigtable - possible issue with schema_mapping table:", zap.Error(err))
return nil, nil, err
}
otelgo.AddAnnotation(ctx, schemaMappingConfigFetched)
return tableMetadata, sortPkData(pkMetadata), nil
}