func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/bigtable/select_operation.go [101:160]


func (btc *BigtableClient) ExecuteBigtableQuery(ctx context.Context, query rh.QueryMetadata) (map[string]map[string]interface{}, error) {

	_, ok := btc.Clients[query.KeyspaceName]
	if !ok {
		return nil, fmt.Errorf("invalid keySpace")
	}
	var instanceName string = fmt.Sprintf("projects/%s/instances/%s", btc.BigtableConfig.GCPProjectID, query.KeyspaceName)
	var appProfileId string = GetProfileId(btc.BigtableConfig.AppProfileID)

	// Construct the x-goog-request-params header
	paramHeaders := fmt.Sprintf("name=%s&app_profile_id=%s", url.QueryEscape(instanceName), appProfileId)
	md := metadata.Pairs("x-goog-request-params", paramHeaders)
	ctxMD := metadata.NewOutgoingContext(ctx, md)

	newParams, err := constructRequestParams(query.Params)
	if err != nil {
		return nil, fmt.Errorf("error constructing params: %v", err)
	}

	req := &btpb.ExecuteQueryRequest{
		InstanceName: instanceName,
		Query:        query.Query,
		DataFormat: &btpb.ExecuteQueryRequest_ProtoFormat{
			ProtoFormat: &btpb.ProtoFormat{},
		},
		Params: newParams,
	}
	btc.Logger.Info("ExecuteQuery", zap.Any("req", req))

	// Call the gRPC method using the context with metadata
	stream, err := btc.SqlClient.ExecuteQuery(ctxMD, req)
	if err != nil {
		return nil, fmt.Errorf("could not execute query: %v", err)
	}

	var rowMapData = make(map[string]map[string]interface{})
	var rowCount = 0
	var cfs []*btpb.ColumnMetadata
	for {
		resp, err := stream.Recv()
		if err == io.EOF {
			return rowMapData, nil
		}
		if err != nil {
			return nil, err
		}
		switch r := resp.Response.(type) {
		case *btpb.ExecuteQueryResponse_Metadata:
			cfs = resp.GetMetadata().GetProtoSchema().GetColumns()
		case *btpb.ExecuteQueryResponse_Results:
			rowMapData, err = btc.ResponseHandler.GetRows(r, cfs, query, &rowCount, rowMapData)
			btc.Logger.Info("ExecuteQueryResponse_Results", zap.Any("rowMapData", rowMapData))
			if err != nil {
				return nil, err
			}
		default:
			return nil, fmt.Errorf("unknown response type")
		}
	}
}