func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go [616:722]


func (btc *BigtableClient) GetSchemaMappingConfigs(ctx context.Context, keyspace, schemaMappingTable string) (map[string]map[string]*schemaMapping.Column, map[string][]schemaMapping.Column, error) {
	// if this is the first time we're getting table configs, ensure the schema mapping table exists
	if btc.SchemaMappingConfig == nil || len(btc.SchemaMappingConfig.TablesMetaData) == 0 {
		err := btc.createSchemaMappingTableMaybe(ctx, keyspace, schemaMappingTable)
		if err != nil {
			return nil, nil, err
		}
	}

	otelgo.AddAnnotation(ctx, fetchingSchemaMappingConfig)
	client, ok := btc.Clients[keyspace]
	if !ok {
		return nil, nil, fmt.Errorf("invalid keySpace - `%s`", keyspace)
	}

	table := client.Open(schemaMappingTable)
	filter := bigtable.LatestNFilter(1)

	tableMetadata := make(map[string]map[string]*schemaMapping.Column)
	pkMetadata := make(map[string][]schemaMapping.Column)

	var readErr error
	err := table.ReadRows(ctx, bigtable.InfiniteRange(""), func(row bigtable.Row) bool {
		// Extract the row key and column values
		var tableName, columnName, columnType, KeyType string
		var isPrimaryKey, isCollection bool
		var pkPrecedence int

		// Extract column values
		for _, item := range row[schemaMappingTableColumnFamily] {
			switch item.Column {
			case schemaMappingTableColumnFamily + ":TableName":
				tableName = string(item.Value)
			case schemaMappingTableColumnFamily + ":ColumnName":
				columnName = string(item.Value)
			case schemaMappingTableColumnFamily + ":ColumnType":
				columnType = string(item.Value)
			case schemaMappingTableColumnFamily + ":IsPrimaryKey":
				isPrimaryKey = string(item.Value) == "true"
			case schemaMappingTableColumnFamily + ":PK_Precedence":
				pkPrecedence, readErr = strconv.Atoi(string(item.Value))
				if readErr != nil {
					return false
				}
			case schemaMappingTableColumnFamily + ":IsCollection":
				isCollection = string(item.Value) == "true"
			case schemaMappingTableColumnFamily + ":KeyType":
				KeyType = string(item.Value)
			}
		}
		cqlType, err := utilities.GetCassandraColumnType(columnType)
		if err != nil {
			readErr = err
			return false
		}
		columnMetadata := message.ColumnMetadata{
			Keyspace: keyspace,
			Table:    tableName,
			Name:     columnName,
			Type:     cqlType,
		}

		// Create a new column struct
		column := schemaMapping.Column{
			ColumnName:   columnName,
			ColumnType:   columnType,
			IsPrimaryKey: isPrimaryKey,
			PkPrecedence: pkPrecedence,
			IsCollection: isCollection,
			Metadata:     columnMetadata,
			KeyType:      KeyType,
		}

		if _, exists := tableMetadata[tableName]; !exists {
			tableMetadata[tableName] = make(map[string]*schemaMapping.Column)
		}

		if _, exists := pkMetadata[tableName]; !exists {
			var pkSlice []schemaMapping.Column
			pkMetadata[tableName] = pkSlice
		}

		// todo: this index is probably wrong - this index will end up being how the columns for this table are sorted in bigtable (not how they were originally written). This means that if a column is added, it could change the index of other tables if it's name is sorted before existing columns (example: "age" vs. ["name", "id", "email"])
		column.Metadata.Index = int32(len(tableMetadata[tableName]))
		tableMetadata[tableName][column.ColumnName] = &column
		if column.IsPrimaryKey {
			pkSlice := pkMetadata[tableName]
			pkSlice = append(pkSlice, column)
			pkMetadata[tableName] = pkSlice
		}

		return true
	}, bigtable.RowFilter(filter))

	// combine errors for simpler error handling
	if err == nil {
		err = readErr
	}

	if err != nil {
		btc.Logger.Error("Failed to read rows from Bigtable - possible issue with schema_mapping table:", zap.Error(err))
		return nil, nil, err
	}

	otelgo.AddAnnotation(ctx, schemaMappingConfigFetched)
	return tableMetadata, sortPkData(pkMetadata), nil
}