cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/bigtable.go (681 lines of code) (raw):

/* * Copyright (C) 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package bigtableclient import ( "bytes" "context" "encoding/hex" "fmt" "slices" "strconv" "strings" "time" "cloud.google.com/go/bigtable" "github.com/datastax/go-cassandra-native-protocol/datatype" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" otelgo "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/otel" "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler" rh "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler" schemaMapping "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/schema-mapping" "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator" "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/utilities" "github.com/datastax/go-cassandra-native-protocol/message" "go.uber.org/zap" ) // Events const ( applyingBigtableMutation = "Applying Insert/Update Mutation" bigtableMutationApplied = "Insert/Update Mutation Applied" callingBigtableSQLAPI = "Calling Bigtable SQL API" bigtableSQLAPICallDone = "Bigtable SQL API Call Done" applyingDeleteMutation = "Applying Delete Mutation" deleteMutationApplied = "Delete Mutation Applied" fetchingSchemaMappingConfig = "Fetching Schema Mapping Configurations" schemaMappingConfigFetched = "Schema Mapping Configurations Fetched" applyingBulkMutation = "Applying Bulk Mutation" bulkMutationApplied = "Bulk Mutation Applied" mutationTypeInsert = "Insert" mutationTypeDelete = "Delete" mutationTypeDeleteColumnFamily = "DeleteColumnFamilies" mutationTypeUpdate = "Update" schemaMappingTableColumnFamily = "cf" ) type BigTableClientIface interface { ApplyBulkMutation(context.Context, string, []MutationData, string) (BulkOperationResponse, error) Close() DeleteRowNew(context.Context, *translator.DeleteQueryMapping) (*message.RowsResult, error) ExecuteBigtableQuery(context.Context, rh.QueryMetadata) (map[string]map[string]interface{}, error) GetSchemaMappingConfigs(context.Context, string, string) (map[string]map[string]*schemaMapping.Column, map[string][]schemaMapping.Column, error) InsertRow(context.Context, *translator.InsertQueryMapping) (*message.RowsResult, error) SelectStatement(context.Context, rh.QueryMetadata) (*message.RowsResult, time.Time, error) AlterTable(ctx context.Context, data *translator.AlterTableStatementMap, schemaMappingTableName string) error CreateTable(ctx context.Context, data *translator.CreateTableStatementMap, schemaMappingTableName string) error DropTable(ctx context.Context, data *translator.DropTableStatementMap, schemaMappingTableName string) error UpdateRow(context.Context, *translator.UpdateQueryMapping) (*message.RowsResult, error) // Functions realted to updating the intance properties. LoadConfigs(rh *responsehandler.TypeHandler, schemaConfig *schemaMapping.SchemaMappingConfig) } // NewBigtableClient - Creates a new instance of BigtableClient. // // Parameters: // - client: Bigtable client. // - logger: Logger instance. // - sqlClient: Bigtable SQL client. // - config: BigtableConfig configuration object. // - responseHandler: TypeHandler for response handling. // - grpcConn: grpcConn for calling apis. // - schemaMapping: schema mapping for response handling. // // Returns: // - BigtableClient: New instance of BigtableClient var NewBigtableClient = func(client map[string]*bigtable.Client, adminClients map[string]*bigtable.AdminClient, logger *zap.Logger, sqlClient btpb.BigtableClient, config BigtableConfig, responseHandler rh.ResponseHandlerIface, grpcConn *grpc.ClientConn, schemaMapping *schemaMapping.SchemaMappingConfig) BigTableClientIface { return &BigtableClient{ Clients: client, AdminClients: adminClients, Logger: logger, SqlClient: sqlClient, BigtableConfig: config, ResponseHandler: responseHandler, SchemaMappingConfig: schemaMapping, grpcConn: grpcConn, } } func (btc *BigtableClient) reloadSchemaMappings(ctx context.Context, keyspace, schemaMappingTableName string) error { tbData, pkData, err := btc.GetSchemaMappingConfigs(ctx, keyspace, schemaMappingTableName) if err != nil { return fmt.Errorf("error when reloading schema mappings for %s.%s: %w", keyspace, schemaMappingTableName, err) } if btc.SchemaMappingConfig.TablesMetaData == nil { btc.SchemaMappingConfig.TablesMetaData = make(map[string]map[string]map[string]*schemaMapping.Column) } btc.SchemaMappingConfig.TablesMetaData[keyspace] = tbData if btc.SchemaMappingConfig.PkMetadataCache == nil { btc.SchemaMappingConfig.PkMetadataCache = make(map[string]map[string][]schemaMapping.Column) } btc.SchemaMappingConfig.PkMetadataCache[keyspace] = pkData return nil } // scan the schema mapping table to determine if the table exists func (btc *BigtableClient) tableSchemaExists(ctx context.Context, client *bigtable.Client, schemaMappingTableName string, tableName string) (bool, error) { table := client.Open(schemaMappingTableName) exists := false err := table.ReadRows(ctx, bigtable.PrefixRange(tableName+"#"), func(row bigtable.Row) bool { exists = true return false }, bigtable.LimitRows(1)) return exists, err } func (btc *BigtableClient) tableResourceExists(ctx context.Context, adminClient *bigtable.AdminClient, table string) (bool, error) { _, err := adminClient.TableInfo(ctx, table) // todo figure out which error message is for table doesn't exist yet or find better check if status.Code(err) == codes.NotFound || status.Code(err) == codes.InvalidArgument { return false, nil } // something went wrong if err != nil { return false, err } return true, nil } // mutateRow() - Applies mutations to a row in the specified Bigtable table. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - tableName: Name of the table where the row exists. // - rowKey: Row key of the row to mutate. // - columns: Columns to mutate. // - values: Values to set in the columns. // - deleteColumnFamilies: Column families to delete. // // Returns: // - error: Error if the mutation fails. func (btc *BigtableClient) mutateRow(ctx context.Context, tableName, rowKey string, columns []translator.Column, values []any, deleteColumnFamilies []string, deleteQualifiers []translator.Column, timestamp bigtable.Timestamp, ifSpec translator.IfSpec, keyspace string, ComplexOperation map[string]*translator.ComplexOperation) (*message.RowsResult, error) { otelgo.AddAnnotation(ctx, applyingBigtableMutation) mut := bigtable.NewMutation() btc.Logger.Info("mutating row", zap.String("key", hex.EncodeToString([]byte(rowKey)))) client, ok := btc.Clients[keyspace] if !ok { return nil, fmt.Errorf("invalid keySpace - `%s`", keyspace) } tbl := client.Open(tableName) if timestamp == 0 { timestamp = bigtable.Timestamp(bigtable.Now().Time().UnixMicro()) } // Delete column families for _, cf := range deleteColumnFamilies { mut.DeleteCellsInFamily(cf) } // Handle complex updates for cf, meta := range ComplexOperation { if meta.UpdateListIndex != "" { index, err := strconv.Atoi(meta.UpdateListIndex) if err != nil { return nil, err } reqTimestamp, err := btc.getIndexOpTimestamp(ctx, tableName, rowKey, cf, keyspace, index) if err != nil { return nil, err } mut.Set(cf, reqTimestamp, timestamp, meta.Value) } if meta.ListDelete { if err := btc.setMutationforListDelete(ctx, tableName, rowKey, cf, keyspace, meta.ListDeleteValues, mut); err != nil { return nil, err } } } // Delete specific column qualifiers for _, q := range deleteQualifiers { mut.DeleteCellsInColumn(q.ColumnFamily, q.Name) } // Set values for columns for i, column := range columns { if bv, ok := values[i].([]byte); ok { mut.Set(column.ColumnFamily, column.Name, timestamp, bv) } else { btc.Logger.Error("Value is not of type []byte", zap.String("column", column.Name), zap.Any("value", values[i])) return nil, fmt.Errorf("value for column %s is not of type []byte", column.Name) } } if ifSpec.IfExists || ifSpec.IfNotExists { predicateFilter := bigtable.CellsPerRowLimitFilter(1) matched := true conditionalMutation := bigtable.NewCondMutation(predicateFilter, mut, nil) if ifSpec.IfNotExists { conditionalMutation = bigtable.NewCondMutation(predicateFilter, nil, mut) } err := tbl.Apply(ctx, rowKey, conditionalMutation, bigtable.GetCondMutationResult(&matched)) otelgo.AddAnnotation(ctx, bigtableMutationApplied) if err != nil { return nil, err } return GenerateAppliedRowsResult(keyspace, tableName, ifSpec.IfExists == matched), nil } // If no conditions, apply the mutation directly err := tbl.Apply(ctx, rowKey, mut) otelgo.AddAnnotation(ctx, bigtableMutationApplied) if err != nil { return nil, err } return &message.RowsResult{ Metadata: &message.RowsMetadata{ LastContinuousPage: true, }, }, nil } func (btc *BigtableClient) DropTable(ctx context.Context, data *translator.DropTableStatementMap, schemaMappingTableName string) error { client, ok := btc.Clients[data.Keyspace] if !ok { return fmt.Errorf("invalid keyspace `%s`", data.Keyspace) } adminClient, ok := btc.AdminClients[data.Keyspace] if !ok { return fmt.Errorf("invalid keyspace `%s`", data.Keyspace) } // first clean up table from schema mapping table because that's the SoT tbl := client.Open(schemaMappingTableName) var deleteMuts []*bigtable.Mutation var rowKeysToDelete []string err := tbl.ReadRows(ctx, bigtable.PrefixRange(data.Table+"#"), func(row bigtable.Row) bool { mut := bigtable.NewMutation() mut.DeleteRow() deleteMuts = append(deleteMuts, mut) rowKeysToDelete = append(rowKeysToDelete, row.Key()) return true }) if err != nil { return err } btc.Logger.Info("drop table: deleting schema rows") _, err = tbl.ApplyBulk(ctx, rowKeysToDelete, deleteMuts) if err != nil { return err } // do a read to check if the table exists to save on admin API write quota exists, err := btc.tableResourceExists(ctx, adminClient, data.Table) if err != nil { return err } if exists { btc.Logger.Info("drop table: deleting bigtable table") err = adminClient.DeleteTable(ctx, data.Table) } if err != nil { return err } // this error behavior is done independently of the table resource existing or not because the schema mapping table is the SoT, not the table resource if len(rowKeysToDelete) == 0 && !data.IfExists { return fmt.Errorf("cannot delete table %s because it does not exist", data.Table) } btc.Logger.Info("reloading schema mappings") err = btc.reloadSchemaMappings(ctx, data.Keyspace, schemaMappingTableName) if err != nil { return err } return nil } func (btc *BigtableClient) createSchemaMappingTableMaybe(ctx context.Context, keyspace, schemaMappingTableName string) error { btc.Logger.Info("ensuring schema mapping table exists") adminClient, ok := btc.AdminClients[keyspace] if !ok { return fmt.Errorf("invalid keyspace `%s`", keyspace) } // do a read to check if the table exists to save on admin API write quota exists, err := btc.tableResourceExists(ctx, adminClient, schemaMappingTableName) if err != nil { return err } if !exists { err = adminClient.CreateTable(ctx, schemaMappingTableName) if err != nil { btc.Logger.Error("failed to create schema mapping table", zap.Error(err)) return err } } err = adminClient.CreateColumnFamily(ctx, schemaMappingTableName, schemaMappingTableColumnFamily) if status.Code(err) == codes.AlreadyExists { err = nil } if err != nil { return err } return nil } func (btc *BigtableClient) CreateTable(ctx context.Context, data *translator.CreateTableStatementMap, schemaMappingTableName string) error { client, ok := btc.Clients[data.Keyspace] if !ok { return fmt.Errorf("invalid keyspace `%s`", data.Keyspace) } adminClient, ok := btc.AdminClients[data.Keyspace] if !ok { return fmt.Errorf("invalid keyspace `%s`", data.Keyspace) } exists, err := btc.tableSchemaExists(ctx, client, schemaMappingTableName, data.Table) if err != nil { return err } if !exists { btc.Logger.Info("updating table schema") err = btc.updateTableSchema(ctx, data.Keyspace, schemaMappingTableName, data.Table, data.PrimaryKeys, data.Columns, nil) if err != nil { return err } } var rowKeySchemaFields []bigtable.StructField for _, key := range data.PrimaryKeys { part, err := createBigtableRowKeyField(key.Name, data.Columns, btc.BigtableConfig.EncodeIntValuesWithBigEndian) if err != nil { return err } rowKeySchemaFields = append(rowKeySchemaFields, part) } columnFamilies := make(map[string]bigtable.Family) for _, col := range data.Columns { if utilities.IsCollectionDataType(col.Type) { columnFamilies[col.Name] = bigtable.Family{ GCPolicy: bigtable.MaxVersionsPolicy(1), } } } columnFamilies[btc.BigtableConfig.DefaultColumnFamily] = bigtable.Family{ GCPolicy: bigtable.MaxVersionsPolicy(1), } btc.Logger.Info("creating bigtable table") err = adminClient.CreateTableFromConf(ctx, &bigtable.TableConf{ TableID: data.Table, ColumnFamilies: columnFamilies, RowKeySchema: &bigtable.StructType{ Fields: rowKeySchemaFields, Encoding: bigtable.StructOrderedCodeBytesEncoding{}, }, }) // ignore already exists errors - the schema mapping table is the SoT if status.Code(err) == codes.AlreadyExists { err = nil } if err != nil { btc.Logger.Error("failed to create bigtable table", zap.Error(err)) return err } if exists && !data.IfNotExists { return fmt.Errorf("cannot create table %s becauase it already exists", data.Table) } btc.Logger.Info("reloading schema mappings") err = btc.reloadSchemaMappings(ctx, data.Keyspace, schemaMappingTableName) if err != nil { return err } return nil } func createBigtableRowKeyField(key string, cols []message.ColumnMetadata, encodeIntValuesWithBigEndian bool) (bigtable.StructField, error) { for _, column := range cols { if column.Name != key { continue } switch column.Type { case datatype.Varchar: return bigtable.StructField{FieldName: key, FieldType: bigtable.StringType{Encoding: bigtable.StringUtf8BytesEncoding{}}}, nil case datatype.Int, datatype.Bigint, datatype.Timestamp: // todo remove once ordered byte encoding is supported for ints if encodeIntValuesWithBigEndian { return bigtable.StructField{FieldName: key, FieldType: bigtable.Int64Type{Encoding: bigtable.BigEndianBytesEncoding{}}}, nil } return bigtable.StructField{FieldName: key, FieldType: bigtable.Int64Type{Encoding: bigtable.Int64OrderedCodeBytesEncoding{}}}, nil default: return bigtable.StructField{}, fmt.Errorf("unhandled row key type %s", column.Type) } } // this should never happen given where this is called return bigtable.StructField{}, fmt.Errorf("missing primary key `%s` from columns definition", key) } func (btc *BigtableClient) AlterTable(ctx context.Context, data *translator.AlterTableStatementMap, schemaMappingTableName string) error { adminClient, ok := btc.AdminClients[data.Keyspace] if !ok { return fmt.Errorf("invalid keyspace `%s`", data.Keyspace) } // passing nil in as pmks because we don't have access to them here and because primary keys can't be altered err := btc.updateTableSchema(ctx, data.Keyspace, schemaMappingTableName, data.Table, nil, data.AddColumns, data.DropColumns) if err != nil { return err } for _, col := range data.AddColumns { if utilities.IsCollectionDataType(col.Type) { err = adminClient.CreateColumnFamily(ctx, data.Table, col.Name) if err != nil { return err } } } btc.Logger.Info("reloading schema mappings") err = btc.reloadSchemaMappings(ctx, data.Keyspace, schemaMappingTableName) if err != nil { return err } return nil } func (btc *BigtableClient) updateTableSchema(ctx context.Context, keyspace string, schemaMappingTableName string, tableName string, pmks []translator.CreateTablePrimaryKeyConfig, addCols []message.ColumnMetadata, dropCols []string) error { client, exists := btc.Clients[keyspace] if !exists { return fmt.Errorf("invalid keyspace `%s`", keyspace) } ts := bigtable.Now() var muts []*bigtable.Mutation var rowKeys []string for _, col := range addCols { mut := bigtable.NewMutation() mut.Set(schemaMappingTableColumnFamily, "ColumnName", ts, []byte(col.Name)) mut.Set(schemaMappingTableColumnFamily, "ColumnType", ts, []byte(col.Type.String())) isCollection := utilities.IsCollectionDataType(col.Type) mut.Set(schemaMappingTableColumnFamily, "IsCollection", ts, []byte(strconv.FormatBool(isCollection))) pmkIndex := slices.IndexFunc(pmks, func(c translator.CreateTablePrimaryKeyConfig) bool { return c.Name == col.Name }) mut.Set(schemaMappingTableColumnFamily, "IsPrimaryKey", ts, []byte(strconv.FormatBool(pmkIndex != -1))) if pmkIndex != -1 { pmkConfig := pmks[pmkIndex] mut.Set(schemaMappingTableColumnFamily, "KeyType", ts, []byte(pmkConfig.KeyType)) } else { // overkill, but overwrite any previous KeyType configs which could exist if the table was recreated with different columns mut.Set(schemaMappingTableColumnFamily, "KeyType", ts, []byte("")) } mut.Set(schemaMappingTableColumnFamily, "PK_Precedence", ts, []byte(strconv.Itoa(pmkIndex+1))) mut.Set(schemaMappingTableColumnFamily, "TableName", ts, []byte(tableName)) muts = append(muts, mut) rowKeys = append(rowKeys, tableName+"#"+col.Name) } // note: we only remove the column from the schema mapping table and don't actually delete any data from the data table for _, col := range dropCols { mut := bigtable.NewMutation() mut.DeleteRow() muts = append(muts, mut) rowKeys = append(rowKeys, tableName+"#"+col) } btc.Logger.Info("updating schema mapping table") table := client.Open(schemaMappingTableName) _, err := table.ApplyBulk(ctx, rowKeys, muts) if err != nil { btc.Logger.Error("update schema mapping table failed") return err } return nil } // InsertRow - Inserts a row into the specified Bigtable table. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - data: InsertQueryMapping object containing the table, row key, columns, values, and deleteColumnFamilies. // // Returns: // - error: Error if the insertion fails. func (btc *BigtableClient) InsertRow(ctx context.Context, insertQueryData *translator.InsertQueryMapping) (*message.RowsResult, error) { return btc.mutateRow(ctx, insertQueryData.Table, insertQueryData.RowKey, insertQueryData.Columns, insertQueryData.Values, insertQueryData.DeleteColumnFamilies, []translator.Column{}, insertQueryData.TimestampInfo.Timestamp, translator.IfSpec{IfNotExists: insertQueryData.IfNotExists}, insertQueryData.Keyspace, nil) } // UpdateRow - Updates a row in the specified Bigtable table. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - data: UpdateQueryMapping object containing the table, row key, columns, values, and DeleteColumnFamilies. // // Returns: // - error: Error if the update fails. func (btc *BigtableClient) UpdateRow(ctx context.Context, updateQueryData *translator.UpdateQueryMapping) (*message.RowsResult, error) { return btc.mutateRow(ctx, updateQueryData.Table, updateQueryData.RowKey, updateQueryData.Columns, updateQueryData.Values, updateQueryData.DeleteColumnFamilies, updateQueryData.DeleteColumQualifires, updateQueryData.TimestampInfo.Timestamp, translator.IfSpec{IfExists: updateQueryData.IfExists}, updateQueryData.Keyspace, updateQueryData.ComplexOperation) } // DeleteRowNew - Deletes a row in the specified Bigtable table. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - queryData: DeleteQueryMapping object containing the table and row key. // // Returns: // - error: Error if the deletion fails. func (btc *BigtableClient) DeleteRowNew(ctx context.Context, deleteQueryData *translator.DeleteQueryMapping) (*message.RowsResult, error) { otelgo.AddAnnotation(ctx, applyingDeleteMutation) client, ok := btc.Clients[deleteQueryData.Keyspace] if !ok { return nil, fmt.Errorf("invalid keySpace - `%s`", deleteQueryData.Keyspace) } tbl := client.Open(deleteQueryData.Table) mut := bigtable.NewMutation() if len(deleteQueryData.SelectedColumns) > 0 { for _, v := range deleteQueryData.SelectedColumns { if v.ListIndex != "" { //handle list delete Items index, err := strconv.Atoi(v.ListIndex) if err != nil { return nil, err } reqTimestamp, err := btc.getIndexOpTimestamp(ctx, deleteQueryData.Table, deleteQueryData.RowKey, v.Name, deleteQueryData.Keyspace, index) if err != nil { return nil, err } mut.DeleteCellsInColumn(v.Name, reqTimestamp) } else if v.MapKey != "" { // Handle map delete Items mut.DeleteCellsInColumn(v.Name, v.MapKey) } } } else { mut.DeleteRow() } if deleteQueryData.IfExists { predicateFilter := bigtable.CellsPerRowLimitFilter(1) conditionalMutation := bigtable.NewCondMutation(predicateFilter, mut, nil) matched := true if err := tbl.Apply(ctx, deleteQueryData.RowKey, conditionalMutation, bigtable.GetCondMutationResult(&matched)); err != nil { return nil, err } if !matched { return GenerateAppliedRowsResult(deleteQueryData.Keyspace, deleteQueryData.Table, false), nil } else { return GenerateAppliedRowsResult(deleteQueryData.Keyspace, deleteQueryData.Table, true), nil } } else { if err := tbl.Apply(ctx, deleteQueryData.RowKey, mut); err != nil { return nil, err } } otelgo.AddAnnotation(ctx, deleteMutationApplied) var response = message.RowsResult{ Metadata: &message.RowsMetadata{ LastContinuousPage: true, }, } return &response, nil } // GetSchemaMappingConfigs - Retrieves schema mapping configurations from the specified config table. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - schemaMappingTable: Name of the table containing the configuration. // // Returns: // - map[string]map[string]*schemaMapping.Column: Table metadata. // - map[string][]schemaMapping.Column: Primary key metadata. // - error: Error if the retrieval fails. func (btc *BigtableClient) GetSchemaMappingConfigs(ctx context.Context, keyspace, schemaMappingTable string) (map[string]map[string]*schemaMapping.Column, map[string][]schemaMapping.Column, error) { // if this is the first time we're getting table configs, ensure the schema mapping table exists if btc.SchemaMappingConfig == nil || len(btc.SchemaMappingConfig.TablesMetaData) == 0 { err := btc.createSchemaMappingTableMaybe(ctx, keyspace, schemaMappingTable) if err != nil { return nil, nil, err } } otelgo.AddAnnotation(ctx, fetchingSchemaMappingConfig) client, ok := btc.Clients[keyspace] if !ok { return nil, nil, fmt.Errorf("invalid keySpace - `%s`", keyspace) } table := client.Open(schemaMappingTable) filter := bigtable.LatestNFilter(1) tableMetadata := make(map[string]map[string]*schemaMapping.Column) pkMetadata := make(map[string][]schemaMapping.Column) var readErr error err := table.ReadRows(ctx, bigtable.InfiniteRange(""), func(row bigtable.Row) bool { // Extract the row key and column values var tableName, columnName, columnType, KeyType string var isPrimaryKey, isCollection bool var pkPrecedence int // Extract column values for _, item := range row[schemaMappingTableColumnFamily] { switch item.Column { case schemaMappingTableColumnFamily + ":TableName": tableName = string(item.Value) case schemaMappingTableColumnFamily + ":ColumnName": columnName = string(item.Value) case schemaMappingTableColumnFamily + ":ColumnType": columnType = string(item.Value) case schemaMappingTableColumnFamily + ":IsPrimaryKey": isPrimaryKey = string(item.Value) == "true" case schemaMappingTableColumnFamily + ":PK_Precedence": pkPrecedence, readErr = strconv.Atoi(string(item.Value)) if readErr != nil { return false } case schemaMappingTableColumnFamily + ":IsCollection": isCollection = string(item.Value) == "true" case schemaMappingTableColumnFamily + ":KeyType": KeyType = string(item.Value) } } cqlType, err := utilities.GetCassandraColumnType(columnType) if err != nil { readErr = err return false } columnMetadata := message.ColumnMetadata{ Keyspace: keyspace, Table: tableName, Name: columnName, Type: cqlType, } // Create a new column struct column := schemaMapping.Column{ ColumnName: columnName, ColumnType: columnType, IsPrimaryKey: isPrimaryKey, PkPrecedence: pkPrecedence, IsCollection: isCollection, Metadata: columnMetadata, KeyType: KeyType, } if _, exists := tableMetadata[tableName]; !exists { tableMetadata[tableName] = make(map[string]*schemaMapping.Column) } if _, exists := pkMetadata[tableName]; !exists { var pkSlice []schemaMapping.Column pkMetadata[tableName] = pkSlice } // todo: this index is probably wrong - this index will end up being how the columns for this table are sorted in bigtable (not how they were originally written). This means that if a column is added, it could change the index of other tables if it's name is sorted before existing columns (example: "age" vs. ["name", "id", "email"]) column.Metadata.Index = int32(len(tableMetadata[tableName])) tableMetadata[tableName][column.ColumnName] = &column if column.IsPrimaryKey { pkSlice := pkMetadata[tableName] pkSlice = append(pkSlice, column) pkMetadata[tableName] = pkSlice } return true }, bigtable.RowFilter(filter)) // combine errors for simpler error handling if err == nil { err = readErr } if err != nil { btc.Logger.Error("Failed to read rows from Bigtable - possible issue with schema_mapping table:", zap.Error(err)) return nil, nil, err } otelgo.AddAnnotation(ctx, schemaMappingConfigFetched) return tableMetadata, sortPkData(pkMetadata), nil } // ApplyBulkMutation - Applies bulk mutations to the specified Bigtable table. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - tableName: Name of the table to apply bulk mutations to. // - mutationData: Slice of MutationData objects containing mutation details. // // Returns: // - BulkOperationResponse: Response indicating the result of the bulk operation. // - error: Error if the bulk mutation fails. func (btc *BigtableClient) ApplyBulkMutation(ctx context.Context, tableName string, mutationData []MutationData, keyspace string) (BulkOperationResponse, error) { rowKeyToMutationMap := make(map[string]*bigtable.Mutation) for _, md := range mutationData { btc.Logger.Info("mutating row BULK", zap.String("key", hex.EncodeToString([]byte(md.RowKey)))) if _, exists := rowKeyToMutationMap[md.RowKey]; !exists { rowKeyToMutationMap[md.RowKey] = bigtable.NewMutation() } mut := rowKeyToMutationMap[md.RowKey] switch md.MutationType { case mutationTypeInsert: { for _, column := range md.Columns { mut.Set(column.ColumnFamily, column.Name, bigtable.Now(), column.Contents) } } case mutationTypeDelete: { mut.DeleteRow() } case mutationTypeDeleteColumnFamily: { mut.DeleteCellsInFamily(md.ColumnFamily) } case mutationTypeUpdate: { for _, column := range md.Columns { mut.Set(column.ColumnFamily, column.Name, bigtable.Now(), column.Contents) } } default: return BulkOperationResponse{ FailedRows: "", }, fmt.Errorf("invalid mutation type `%s`", md.MutationType) } } // create mutations from mutation data var mutations []*bigtable.Mutation var rowKeys []string for key, mutation := range rowKeyToMutationMap { mutations = append(mutations, mutation) rowKeys = append(rowKeys, key) } otelgo.AddAnnotation(ctx, applyingBulkMutation) client, ok := btc.Clients[keyspace] if !ok { return BulkOperationResponse{ FailedRows: "All Rows are failed", }, fmt.Errorf("invalid keySpace - `%s`", keyspace) } tbl := client.Open(tableName) errs, err := tbl.ApplyBulk(ctx, rowKeys, mutations) if err != nil { return BulkOperationResponse{ FailedRows: "All Rows are failed", }, fmt.Errorf("ApplyBulk: %w", err) } var failedRows []string for i, e := range errs { if e != nil { failedRows = append(failedRows, rowKeys[i]) } } var res BulkOperationResponse if len(failedRows) > 0 { res = BulkOperationResponse{ FailedRows: fmt.Sprintf("failed rowkeys: %v", failedRows), } } else { res = BulkOperationResponse{ FailedRows: "", } } otelgo.AddAnnotation(ctx, bulkMutationApplied) return res, nil } // Close() gracefully shuts down the Bigtable client and gRPC connection. // // It iterates through all active Bigtable clients and closes them before closing the gRPC connection. func (btc *BigtableClient) Close() { for _, clients := range btc.Clients { clients.Close() } btc.grpcConn.Close() } // getIndexOpTimestamp() retrieves the timestamp qualifier for a given list index in a Bigtable row. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - tableName: The name of the Bigtable table. // - rowKey: The row key to look up. // - columnFamily: The column family where the list is stored. // - keyspace: The keyspace identifier. // - index: The index of the list element for which the timestamp is required. // // Returns: // - string: The timestamp qualifier if found. // - error: An error if the row does not exist, the index is out of bounds, or any other retrieval failure occurs. func (btc *BigtableClient) getIndexOpTimestamp(ctx context.Context, tableName, rowKey, columnFamily, keyspace string, index int) (string, error) { client, ok := btc.Clients[keyspace] if !ok { return "", fmt.Errorf("invalid keySpace - `%s`", keyspace) } tbl := client.Open(tableName) row, err := tbl.ReadRow(ctx, rowKey, bigtable.RowFilter(bigtable.ChainFilters( bigtable.FamilyFilter(columnFamily), bigtable.LatestNFilter(1), // this filter is so that we fetch only the latest timestamp value ))) if err != nil { btc.Logger.Error("Failed to read row", zap.String("RowKey", rowKey), zap.Error(err)) return "", err } if len(row[columnFamily]) <= 0 { return "", fmt.Errorf("no values found in list %s", columnFamily) } for i, item := range row[columnFamily] { if i == index { splits := strings.Split(item.Column, ":") qualifier := splits[1] return qualifier, nil } } return "", fmt.Errorf("index %d out of bounds for list size %d", index, len(row[columnFamily])) } // setMutationforListDelete() applies deletions to specific list elements in a Bigtable row. // // This function identifies and removes the specified elements from a list stored in a column family. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - tableName: The name of the Bigtable table. // - rowKey: The row key containing the list. // - columnFamily: The column family storing the list. // - keyspace: The keyspace identifier. // - deleteList: A slice of byte arrays representing the values to be deleted from the list. // - mut: The Bigtable mutation object to which delete operations will be added. // // Returns: // - error: An error if the row does not exist or if list elements cannot be deleted. func (btc *BigtableClient) setMutationforListDelete(ctx context.Context, tableName, rowKey, columnFamily, keyspace string, deleteList [][]byte, mut *bigtable.Mutation) error { client, ok := btc.Clients[keyspace] if !ok { return fmt.Errorf("invalid keySpace - `%s`", keyspace) } tbl := client.Open(tableName) row, err := tbl.ReadRow(ctx, rowKey, bigtable.RowFilter(bigtable.ChainFilters( bigtable.FamilyFilter(columnFamily), bigtable.LatestNFilter(1), // this filter is so that we fetch only the latest timestamp value ))) if err != nil { btc.Logger.Error("Failed to read row", zap.String("RowKey", rowKey), zap.Error(err)) return err } if len(row[columnFamily]) <= 0 { return fmt.Errorf("no values found in list %s", columnFamily) } for _, item := range row[columnFamily] { for _, dItem := range deleteList { if bytes.Equal(dItem, item.Value) { splits := strings.Split(item.Column, ":") qualifier := splits[1] mut.DeleteCellsInColumn(columnFamily, qualifier) } } } return nil } // LoadConfigs initializes the BigtableClient with the provided response handler // and schema mapping configuration. func (btc *BigtableClient) LoadConfigs(rh *responsehandler.TypeHandler, schemaConfig *schemaMapping.SchemaMappingConfig) { // Set the ResponseHandler for the BigtableClient to handle responses. btc.ResponseHandler = rh // Set the SchemaMappingConfig to define how data mapping will be handled in Bigtable. btc.SchemaMappingConfig = schemaConfig }