in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/select_operation.go [101:160]
func (btc *BigtableClient) ExecuteBigtableQuery(ctx context.Context, query rh.QueryMetadata) (map[string]map[string]interface{}, error) {
_, ok := btc.Clients[query.KeyspaceName]
if !ok {
return nil, fmt.Errorf("invalid keySpace")
}
var instanceName string = fmt.Sprintf("projects/%s/instances/%s", btc.BigtableConfig.GCPProjectID, query.KeyspaceName)
var appProfileId string = GetProfileId(btc.BigtableConfig.AppProfileID)
// Construct the x-goog-request-params header
paramHeaders := fmt.Sprintf("name=%s&app_profile_id=%s", url.QueryEscape(instanceName), appProfileId)
md := metadata.Pairs("x-goog-request-params", paramHeaders)
ctxMD := metadata.NewOutgoingContext(ctx, md)
newParams, err := constructRequestParams(query.Params)
if err != nil {
return nil, fmt.Errorf("error constructing params: %v", err)
}
req := &btpb.ExecuteQueryRequest{
InstanceName: instanceName,
Query: query.Query,
DataFormat: &btpb.ExecuteQueryRequest_ProtoFormat{
ProtoFormat: &btpb.ProtoFormat{},
},
Params: newParams,
}
btc.Logger.Info("ExecuteQuery", zap.Any("req", req))
// Call the gRPC method using the context with metadata
stream, err := btc.SqlClient.ExecuteQuery(ctxMD, req)
if err != nil {
return nil, fmt.Errorf("could not execute query: %v", err)
}
var rowMapData = make(map[string]map[string]interface{})
var rowCount = 0
var cfs []*btpb.ColumnMetadata
for {
resp, err := stream.Recv()
if err == io.EOF {
return rowMapData, nil
}
if err != nil {
return nil, err
}
switch r := resp.Response.(type) {
case *btpb.ExecuteQueryResponse_Metadata:
cfs = resp.GetMetadata().GetProtoSchema().GetColumns()
case *btpb.ExecuteQueryResponse_Results:
rowMapData, err = btc.ResponseHandler.GetRows(r, cfs, query, &rowCount, rowMapData)
btc.Logger.Info("ExecuteQueryResponse_Results", zap.Any("rowMapData", rowMapData))
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unknown response type")
}
}
}