in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler/responsehandler.go [362:554]
func (th *TypeHandler) BuildResponseRow(rowMap map[string]interface{}, query QueryMetadata, cmd []*message.ColumnMetadata, mapKeyArray []string, lastRow bool) (message.Row, error) {
var mr message.Row
for index, metaData := range cmd {
key := metaData.Name
if rowMap[key] == nil {
rowMap[key] = []byte{}
mr = append(mr, []byte{})
continue
}
value := rowMap[key]
var cqlType string
var err error
var isCollection bool
col := GetQueryColumn(query, index, key)
if col.FuncName == "count" {
cqlType = "bigint"
} else if col.IsWriteTimeColumn {
cqlType = "timestamp"
if _, exists := query.AliasMap[key]; exists {
val := value.(map[string]interface{})
value = val[key]
}
} else if aliasKey, exists := query.AliasMap[key]; exists { //here
colName := aliasKey.Name
if col.MapKey != "" {
colName = col.MapColumnName
}
cqlType, isCollection, err = th.GetColumnMeta(query.KeyspaceName, query.TableName, colName)
if !isCollection {
val := value.(map[string]interface{})
value = val[aliasKey.Alias]
}
} else if col.IsFunc {
funcColumn := strings.SplitN(key, "_", 2)[1]
cqlType, isCollection, err = th.GetColumnMeta(query.KeyspaceName, query.TableName, funcColumn)
} else if col.MapKey != "" {
cqlType, isCollection, err = th.GetColumnMeta(query.KeyspaceName, query.TableName, col.MapColumnName)
} else {
cqlType, isCollection, err = th.GetColumnMeta(query.KeyspaceName, query.TableName, key)
}
if err != nil {
return nil, err
}
cqlType = strings.ToLower(cqlType)
if col.IsFunc || query.IsGroupBy {
if col.FuncName == "count" {
cqlType = "bigint"
metaData.Type = datatype.Bigint
if _, exists := query.AliasMap[key]; !exists && lastRow {
column := fmt.Sprintf("system.%s(%s)", col.FuncName, col.FuncColumnName)
metaData.Name = column
}
}
var dt datatype.DataType
var val interface{}
// For aggregate functions, it specifically handles:
// - int64 values: converts to bigint or int based on CQL type
// - float64 values: converts to bigint, int, float, or double based on CQL type.
// Ensuring compatibility between BigTable and Cassandra type systems for aggregated results.
switch v := value.(type) {
case int64:
switch cqlType {
case "bigint":
val = v
dt = datatype.Bigint
case "int":
val = int32(v)
dt = datatype.Int
default:
return nil, fmt.Errorf("invalid cqlType - value received type: %v, CqlType: %s", v, cqlType)
}
case float64:
switch cqlType {
case "bigint":
val = int64(v)
dt = datatype.Bigint
case "int":
val = int32(v)
dt = datatype.Int
case "float":
val = float32(v)
dt = datatype.Float
case "double":
val = v
dt = datatype.Double
default:
return nil, fmt.Errorf("invalid cqlType - value recieved type: %v, CqlType: %s", v, cqlType)
}
case []byte:
mr = append(mr, v)
continue
default:
return nil, fmt.Errorf("unsupported value type received in Bigtable: %v, value: %v, type: %T", cqlType, value, v)
}
encoded, err := proxycore.EncodeType(dt, primitive.ProtocolVersion4, val)
if err != nil {
return nil, fmt.Errorf("failed to encode value: %v", err)
}
value = encoded
// converting key to function call implementing correct column name for aggregate function call
if _, exists := query.AliasMap[key]; !exists && lastRow && col.IsFunc {
column := fmt.Sprintf("system.%s(%s)", col.FuncName, col.FuncColumnName)
metaData.Name = column
}
mr = append(mr, value.([]byte))
continue
}
dt, err := utilities.GetCassandraColumnType(cqlType)
if err != nil {
return nil, err
}
if isCollection {
if dt.GetDataTypeCode() == primitive.DataTypeCodeSet {
setType := dt.(datatype.SetType)
// creating array
setval := []interface{}{}
for _, val := range value.([]Maptype) {
setval = append(setval, val.Key)
}
err = th.HandleSetType(setval, &mr, setType, query.ProtocalV)
if err != nil {
return nil, err
}
} else if dt.GetDataTypeCode() == primitive.DataTypeCodeList {
listType := dt.(datatype.ListType)
listVal := []interface{}{}
for _, val := range value.([]Maptype) {
listVal = append(listVal, val.Value)
}
err = th.HandleListType(listVal, &mr, listType, query.ProtocalV)
if err != nil {
return nil, err
}
} else if dt.GetDataTypeCode() == primitive.DataTypeCodeMap {
mapType := dt.(datatype.MapType)
mapData := make(map[string]interface{})
mapKey := ""
if col.MapKey != "" {
mapKey = mapKeyArray[index]
}
if mapKey != "" {
if _, exists := query.AliasMap[key]; !exists && lastRow {
column := fmt.Sprintf("%s['%s']", col.MapColumnName, col.MapKey)
metaData.Name = column
}
if _, exists := query.AliasMap[key]; exists {
val := value.(map[string]interface{})
value = val[key]
}
if lastRow {
metaData.Type = mapType.GetValueType()
}
if value == nil {
mr = append(mr, []byte{})
continue
}
mr = append(mr, value.([]byte))
continue
}
for _, val := range value.([]Maptype) {
mapData[val.Key] = val.Value
}
if mapType.GetKeyType() == datatype.Varchar {
err = th.HandleMapType(mapData, &mr, mapType, query.ProtocalV)
} else if mapType.GetKeyType() == datatype.Timestamp {
err = th.HandleTimestampMap(mapData, &mr, mapType, query.ProtocalV)
}
if err != nil {
th.Logger.Error("Error while Encoding json->bytes -> ", zap.Error(err))
return nil, fmt.Errorf("failed to retrieve Map data: %v", err)
}
}
} else {
value, err = HandlePrimitiveEncoding(dt, value, query.ProtocalV, true)
if err != nil {
return nil, err
}
if value == nil {
mr = append(mr, []byte{})
continue
}
mr = append(mr, value.([]byte))
}
}
return mr, nil
}