cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/select_operation.go (175 lines of code) (raw):

/* * Copyright (C) 2025 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); you may not * use this file except in compliance with the License. You may obtain a copy of * the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the * License for the specific language governing permissions and limitations under * the License. */ package bigtableclient import ( "context" "fmt" "io" "net/url" "reflect" "strconv" "time" btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb" rh "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler" "github.com/datastax/go-cassandra-native-protocol/message" "go.uber.org/zap" "google.golang.org/grpc/metadata" ) // SelectStatement - Executes a select statement on Bigtable and returns the result. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - query: rh.QueryMetadata containing the query and parameters. // // Returns: // - *message.RowsResult: The result of the select statement. // - time.Duration: The total elapsed time for the operation. // - error: Error if the select statement execution fails. func (btc *BigtableClient) SelectStatement(ctx context.Context, query rh.QueryMetadata) (*message.RowsResult, time.Time, error) { var data message.RowSet var bigtableEnd time.Time rowMap, err := btc.ExecuteBigtableQuery(ctx, query) bigtableEnd = time.Now() if err != nil { return nil, bigtableEnd, err } columnMetadata, mapKeyArray, err := btc.ResponseHandler.BuildMetadata(rowMap, query) if err != nil { return nil, bigtableEnd, err } for i := range len(rowMap) { lastRow := i == len(rowMap)-1 mr, err := btc.ResponseHandler.BuildResponseRow(rowMap[strconv.Itoa(i)], query, columnMetadata, mapKeyArray, lastRow) if err != nil { return nil, bigtableEnd, err } if len(mr) != 0 { data = append(data, mr) } } if len(columnMetadata) == 0 { //TODO Ensure ColumnMetadata returned is in same order as columns in table columnMetadata, err = btc.SchemaMappingConfig.GetMetadataForSelectedColumns(query.TableName, query.SelectedColumns, query.KeyspaceName) if err != nil { btc.Logger.Error("error while fetching columnMetadata from config -", zap.Error(err)) return nil, bigtableEnd, err } } result := &message.RowsResult{ Metadata: &message.RowsMetadata{ ColumnCount: int32(len(columnMetadata)), Columns: columnMetadata, }, Data: data, } return result, bigtableEnd, nil } // ExecuteBigtableQuery() - Executes a Bigtable query using the provided context and query metadata. // // Parameters: // - ctx: Context for the operation, used for cancellation and deadlines. // - query: rh.QueryMetadata containing the query and parameters. // // Returns: // - map[string]map[string]interface{}: Retrieved rows from the Bigtable query mapped by row keys. // - error: Error if the query execution fails. func (btc *BigtableClient) ExecuteBigtableQuery(ctx context.Context, query rh.QueryMetadata) (map[string]map[string]interface{}, error) { _, ok := btc.Clients[query.KeyspaceName] if !ok { return nil, fmt.Errorf("invalid keySpace") } var instanceName string = fmt.Sprintf("projects/%s/instances/%s", btc.BigtableConfig.GCPProjectID, query.KeyspaceName) var appProfileId string = GetProfileId(btc.BigtableConfig.AppProfileID) // Construct the x-goog-request-params header paramHeaders := fmt.Sprintf("name=%s&app_profile_id=%s", url.QueryEscape(instanceName), appProfileId) md := metadata.Pairs("x-goog-request-params", paramHeaders) ctxMD := metadata.NewOutgoingContext(ctx, md) newParams, err := constructRequestParams(query.Params) if err != nil { return nil, fmt.Errorf("error constructing params: %v", err) } req := &btpb.ExecuteQueryRequest{ InstanceName: instanceName, Query: query.Query, DataFormat: &btpb.ExecuteQueryRequest_ProtoFormat{ ProtoFormat: &btpb.ProtoFormat{}, }, Params: newParams, } btc.Logger.Info("ExecuteQuery", zap.Any("req", req)) // Call the gRPC method using the context with metadata stream, err := btc.SqlClient.ExecuteQuery(ctxMD, req) if err != nil { return nil, fmt.Errorf("could not execute query: %v", err) } var rowMapData = make(map[string]map[string]interface{}) var rowCount = 0 var cfs []*btpb.ColumnMetadata for { resp, err := stream.Recv() if err == io.EOF { return rowMapData, nil } if err != nil { return nil, err } switch r := resp.Response.(type) { case *btpb.ExecuteQueryResponse_Metadata: cfs = resp.GetMetadata().GetProtoSchema().GetColumns() case *btpb.ExecuteQueryResponse_Results: rowMapData, err = btc.ResponseHandler.GetRows(r, cfs, query, &rowCount, rowMapData) btc.Logger.Info("ExecuteQueryResponse_Results", zap.Any("rowMapData", rowMapData)) if err != nil { return nil, err } default: return nil, fmt.Errorf("unknown response type") } } } // constructRequestValues - Constructs a btpb.Value based on the type of the input value. // // Parameters: // - value: interface{} representing the input value. // // Returns: // - *btpb.Value: Constructed btpb.Value suitable for Bigtable requests. // - error: Error if the value type is unsupported. func constructRequestValues(value interface{}) (*btpb.Value, error) { switch v := value.(type) { case string: return &btpb.Value{ Kind: &btpb.Value_StringValue{StringValue: v}, Type: &btpb.Type{Kind: &btpb.Type_StringType{}}, }, nil case int32: return &btpb.Value{ Kind: &btpb.Value_IntValue{IntValue: int64(v)}, Type: &btpb.Type{Kind: &btpb.Type_Int64Type{}}, }, nil case int64: return &btpb.Value{ Kind: &btpb.Value_IntValue{IntValue: v}, Type: &btpb.Type{Kind: &btpb.Type_Int64Type{}}, }, nil case float64: return &btpb.Value{ Kind: &btpb.Value_FloatValue{FloatValue: v}, Type: &btpb.Type{Kind: &btpb.Type_Float64Type{}}, }, nil case float32: return &btpb.Value{ Kind: &btpb.Value_FloatValue{FloatValue: float64(v)}, Type: &btpb.Type{Kind: &btpb.Type_Float32Type{}}, }, nil default: val := reflect.ValueOf(value) // Return early if value is not a slice if val.Kind() != reflect.Slice { return nil, fmt.Errorf("unsupported type: %T", value) } // Return early if slice is empty if val.Len() == 0 { return &btpb.Value{ Kind: &btpb.Value_ArrayValue{ArrayValue: &btpb.ArrayValue{Values: []*btpb.Value{}}}, Type: &btpb.Type{Kind: &btpb.Type_ArrayType{ArrayType: &btpb.Type_Array{ElementType: nil}}}, }, nil } // Process array values arrayValues := make([]*btpb.Value, val.Len()) var elementType *btpb.Type for i := 0; i < val.Len(); i++ { elem := val.Index(i).Interface() btpbValue, err := constructRequestValues(elem) if err != nil { return nil, fmt.Errorf("unsupported element type in array: %v", err) } // Ensure homogeneous array if elementType == nil { elementType = btpbValue.Type } else if !reflect.DeepEqual(elementType, btpbValue.Type) { return nil, fmt.Errorf("heterogeneous array detected: elements must be of the same type") } arrayValues[i] = btpbValue } return &btpb.Value{ Kind: &btpb.Value_ArrayValue{ArrayValue: &btpb.ArrayValue{Values: arrayValues}}, Type: &btpb.Type{Kind: &btpb.Type_ArrayType{ArrayType: &btpb.Type_Array{ElementType: elementType}}}, }, nil } } // constructRequestParams - Transforms a map of input parameters into a map of btpb.Value suitable for Bigtable requests. // // Parameters: // - inputParams: map[string]interface{} containing the input parameters. // // Returns: // - map[string]*btpb.Value: Transformed parameters suitable for Bigtable requests. // - error: Error if any value type is unsupported. func constructRequestParams(inputParams map[string]interface{}) (map[string]*btpb.Value, error) { newParams := make(map[string]*btpb.Value) for key, value := range inputParams { btpbValue, err := constructRequestValues(value) if err != nil { return nil, fmt.Errorf("unsupported type for key %s: %v", key, err) } newParams[key] = btpbValue } return newParams, nil }