cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/schema-mapping/schema_mapping.go (210 lines of code) (raw):

/* * Copyright (C) 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package schemaMapping import ( "fmt" "slices" "github.com/datastax/go-cassandra-native-protocol/datatype" "github.com/datastax/go-cassandra-native-protocol/message" "go.uber.org/zap" ) const ( LimitValue = "limitValue" ) type Column struct { CQLType string ColumnName string ColumnType string IsPrimaryKey bool PkPrecedence int IsCollection bool KeyType string Metadata message.ColumnMetadata } type SchemaMappingConfig struct { Logger *zap.Logger TablesMetaData map[string]map[string]map[string]*Column PkMetadataCache map[string]map[string][]Column SystemColumnFamily string } type SelectedColumns struct { FormattedColumn string Name string IsFunc bool IsAs bool FuncName string Alias string MapKey string ListIndex string FuncColumnName string MapColumnName string KeyType string IsWriteTimeColumn bool } // GetPkByTableName finds the primary key columns of a specified table in a given keyspace. // // This method looks up the cached primary key metadata and returns the relevant columns. // // Parameters: // - keySpace: The name of the keyspace where the table resides. // - tableName: The name of the table for which primary key metadata is requested. // // Returns: // - []Column: A slice of Column structs representing the primary keys of the table. // - error: Returns an error if the primary key metadata is not found. func (c *SchemaMappingConfig) GetPkByTableName(tableName string, keySpace string) ([]Column, error) { pkMeta, ok := c.PkMetadataCache[keySpace][tableName] if !ok { return nil, fmt.Errorf("could not find metadata for the table: %s", tableName) } return pkMeta, nil } func (c *SchemaMappingConfig) GetPkByTableNameWithFilter(tableName string, keySpace string, filterPrimaryKeys []string) ([]Column, error) { pmks, err := c.GetPkByTableName(tableName, keySpace) if err != nil { return nil, err } var result []Column for _, pmk := range pmks { if slices.Contains(filterPrimaryKeys, pmk.ColumnName) { result = append(result, pmk) } } return result, nil } // GetColumnType retrieves the metadata for a specified column in a given table and keyspace. // // This method is part of the SchemaMappingConfig struct and is responsible for mapping // column types from Cassandra (CQL) to Google Cloud Bigtable. // // Parameters: // - keyspace: The name of the keyspace where the table resides. // - tableName: The name of the table containing the column. // - columnName: The name of the column for which metadata is retrieved. // // Returns: // - A pointer to a ColumnType struct containing the column's CQL type, whether it's a collection, // whether it's a primary key, and its key type. // - An error if the table or column metadata is not found. func (c *SchemaMappingConfig) GetColumnType(keyspace, tableName, columnName string) (*Column, error) { td, ok := c.TablesMetaData[keyspace][tableName] if !ok { return nil, fmt.Errorf("could not find metadata for the table: %s", tableName) } col, ok := td[columnName] if !ok { return nil, fmt.Errorf("could not find column %s metadata for the table: %s", columnName, tableName) } return &Column{ CQLType: col.ColumnType, IsPrimaryKey: col.IsPrimaryKey, IsCollection: col.IsCollection, KeyType: col.KeyType, }, nil } // GetMetadataForColumns() retrieves metadata for specific columns in a given table. // This method is a part of the SchemaMappingConfig struct. // // Parameters: // - tableName: The name of the table for which column metadata is being requested. // - columnNames(optional):Accepts nil if no columnNames provided or else A slice of strings containing the names of the columns for which // metadata is required. If this slice is empty, metadata for all // columns in the table will be returned. // // Returns: // - A slice of pointers to ColumnMetadata structs containing metadata for each requested column. // - An error if the specified table is not found in the TablesMetaData. func (c *SchemaMappingConfig) GetMetadataForColumns(keySpace, tableName string, columnNames []string) ([]*message.ColumnMetadata, error) { columnsMap, ok := c.TablesMetaData[keySpace][tableName] if !ok { err := fmt.Errorf("could not find metadata for the table: %s", tableName) c.Logger.Error(err.Error()) return nil, err } if len(columnNames) == 0 { return c.getAllColumnsMetadata(columnsMap), nil } ff, errr := c.getSpecificColumnsMetadata(columnsMap, columnNames, tableName) return ff, errr } // GetMetadataForSelectedColumns retrieves metadata for specified columns in a given table. // This method fetches metadata for the selected columns from the schema mapping configuration. // If no columns are specified, metadata for all columns in the table is returned. // // Parameters: // - tableName: The name of the table for which column metadata is being requested. // - keySpace: The keyspace where the table resides. // - columnNames: A slice of SelectedColumns specifying the columns for which metadata is required. // If nil or empty, metadata for all columns in the table is returned. // // Returns: // - []*message.ColumnMetadata: A slice of pointers to ColumnMetadata structs containing metadata // for each requested column. // - error: Returns an error if the specified table is not found in TablesMetaData. func (c *SchemaMappingConfig) GetMetadataForSelectedColumns(tableName string, columnNames []SelectedColumns, keySpace string) ([]*message.ColumnMetadata, error) { columnsMap, ok := c.TablesMetaData[keySpace][tableName] if !ok { err := fmt.Errorf("could not find metadata for the table: %s", tableName) c.Logger.Error(err.Error()) return nil, err } if len(columnNames) == 0 { return c.getAllColumnsMetadata(columnsMap), nil } return c.getSpecificColumnsMetadataForSelectedColumns(columnsMap, columnNames, tableName) } // getTimestampColumnName() constructs the appropriate name for a column used in a writetime function. // // Parameters: // - aliasName: A string representing the alias of the column, if any. // - columnName: The actual name of the column for which the writetime function is being constructed. // // Returns: A string which is either the alias name or the expression "writetime(columnName)" if no alias is provided. func getTimestampColumnName(aliasName string, columnName string) string { if aliasName == "" { return "writetime(" + columnName + ")" } return aliasName } // getSpecificColumnsMetadataForSelectedColumns() generates column metadata for specifically selected columns. // It handles regular columns, writetime columns, and special columns, and logs an error for invalid configurations. // // Parameters: // - columnsMap: A map where the keys are column names and the values are pointers to Column containing metadata. // - selectedColumns: A slice of SelectedColumns representing columns that have been selected for query. // - tableName: The name of the table from which columns are being selected. // // Returns: // - A slice of pointers to ColumnMetadata, representing the metadata for each selected column. // - An error if a column cannot be found in the map, or if there's an issue handling special columns. func (c *SchemaMappingConfig) getSpecificColumnsMetadataForSelectedColumns(columnsMap map[string]*Column, selectedColumns []SelectedColumns, tableName string) ([]*message.ColumnMetadata, error) { var columnMetadataList []*message.ColumnMetadata var columnName string for i, columnMeta := range selectedColumns { columnName = columnMeta.Name if column, ok := columnsMap[columnName]; ok { columnMetadataList = append(columnMetadataList, c.cloneColumnMetadata(&column.Metadata, int32(i))) } else if columnMeta.IsWriteTimeColumn { metadata, err := c.handleSpecialColumn(columnsMap, getTimestampColumnName(columnMeta.Alias, columnMeta.FuncColumnName), int32(i), true) if err != nil { return nil, err } columnMetadataList = append(columnMetadataList, metadata) } else if isSpecialColumn(columnName) { metadata, err := c.handleSpecialColumn(columnsMap, columnName, int32(i), false) if err != nil { return nil, err } columnMetadataList = append(columnMetadataList, metadata) } else if columnMeta.IsFunc { c.Logger.Debug("Identified a function call", zap.String("columnName", columnName)) } else { errMsg := fmt.Sprintf("metadata not found for the `%s` column in `%s`table", columnName, tableName) c.Logger.Error(errMsg) return nil, fmt.Errorf("%s", errMsg) } } return columnMetadataList, nil } // getAllColumnsMetadata() retrieves metadata for all columns in a given table. // // Parameters: // - columnsMap: column info for a given table. // // Returns: // - A slice of pointers to ColumnMetadata structs containing metadata for each requested column. // - An error func (c *SchemaMappingConfig) getAllColumnsMetadata(columnsMap map[string]*Column) []*message.ColumnMetadata { var columnMetadataList []*message.ColumnMetadata var i int32 = 0 for _, column := range columnsMap { columnMetadataList = append(columnMetadataList, c.cloneColumnMetadata(&column.Metadata, int32(i))) i++ } return columnMetadataList } // getSpecificColumnsMetadata() retrieves metadata for specific columns in a given table. // // Parameters: // - columnsMap: column info for a given table. // - columnNames: column names for which the metadata is required. // - tableName: name of the table // // Returns: // - A slice of pointers to ColumnMetadata structs containing metadata for each requested column. // - An error func (c *SchemaMappingConfig) getSpecificColumnsMetadata(columnsMap map[string]*Column, columnNames []string, tableName string) ([]*message.ColumnMetadata, error) { var columnMetadataList []*message.ColumnMetadata for i, columnName := range columnNames { if column, ok := columnsMap[columnName]; ok { columnMetadataList = append(columnMetadataList, c.cloneColumnMetadata(&column.Metadata, int32(i))) } else if isSpecialColumn(columnName) { metadata, err := c.handleSpecialColumn(columnsMap, columnName, int32(i), false) if err != nil { return nil, err } columnMetadataList = append(columnMetadataList, metadata) } else { errMsg := fmt.Sprintf("metadata not found for the `%s` column in `%s`table", columnName, tableName) c.Logger.Error(errMsg) return nil, fmt.Errorf("%s", errMsg) } } return columnMetadataList, nil } // handleSpecialColumn() retrieves metadata for special columns. // // Parameters: // - columnsMap: column info for a given table. // - columnName: column name for which the metadata is required. // - index: Index for the column // // Returns: // - Pointers to ColumnMetadata structs containing metadata for each requested column. // - An error func (c *SchemaMappingConfig) handleSpecialColumn(columnsMap map[string]*Column, columnName string, index int32, isWriteTimeFunction bool) (*message.ColumnMetadata, error) { // Validate if the column is a special column if !isSpecialColumn(columnName) && !isWriteTimeFunction { return nil, fmt.Errorf("invalid special column: %s", columnName) } // Retrieve the first available column in the map var columnMd *message.ColumnMetadata for _, column := range columnsMap { columnMd = column.Metadata.Clone() columnMd.Index = index columnMd.Name = columnName columnMd.Type = datatype.Bigint break } // No matching column found if columnMd == nil { return nil, fmt.Errorf("special column %s not found in provided metadata", columnName) } return columnMd, nil } // cloneColumnMetadata() clones the metadata from cache. // // Parameters: // - metadata: Column metadata from cache // - index: Index for the column // // Returns: // - Pointers to ColumnMetadata structs containing metadata for each requested column. func (c *SchemaMappingConfig) cloneColumnMetadata(metadata *message.ColumnMetadata, index int32) *message.ColumnMetadata { columnMd := metadata.Clone() columnMd.Index = index return columnMd } // isSpecialColumn() to check if its a special column. // // Parameters: // - columnName: name of special column // // Returns: // - boolean func isSpecialColumn(columnName string) bool { return columnName == LimitValue } // InstanceExists checks if a given keyspace exists in the schema mapping configuration. // // Parameters: // - keyspace: The name of the keyspace to check // // Returns: // - bool: true if the keyspace exists, false otherwise func (c *SchemaMappingConfig) InstanceExists(keyspace string) bool { _, ok := c.TablesMetaData[keyspace] return ok } // TableExist checks if a given table exists within a specified keyspace in the schema mapping configuration. // // Parameters: // - keyspace: The name of the keyspace containing the table // - tableName: The name of the table to check // // Returns: // - bool: true if the table exists in the specified keyspace, false otherwise func (c *SchemaMappingConfig) TableExist(keyspace string, tableName string) bool { _, ok := c.TablesMetaData[keyspace][tableName] return ok } // GetPkKeyType() returns the key type of a primary key column for a given table and keyspace. // It takes the table name, keyspace name, and column name as input parameters. // Returns the key type as a string if the column is a primary key, or an error if: // - There's an error retrieving primary key information // - The specified column is not a primary key in the table func (c *SchemaMappingConfig) GetPkKeyType(tableName string, keySpace string, columnName string) (string, error) { pkColumns, err := c.GetPkByTableName(tableName, keySpace) if err != nil { return "", err } for _, col := range pkColumns { if col.ColumnName == columnName { return col.KeyType, nil } } return "", fmt.Errorf("column %s is not a primary key in table %s", columnName, tableName) }