in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler/responsehandler.go [154:261]
func (th *TypeHandler) ProcessArray(value *btpb.Value, rowMap *map[string]interface{}, cfName string, query QueryMetadata, cn string, isWritetime bool, isAggregate bool) {
if IsArrayType(value) {
arr := value.GetArrayValue().Values
length := len(arr)
var key string
var byteValue []byte
for i, v := range arr {
if IsArrayType(v) {
th.ProcessArray(v, rowMap, cfName, query, cn, isWritetime, isAggregate)
} else if length == 2 && i == 0 {
key = string(v.GetBytesValue())
} else if length == 2 && i == 1 {
byteValue = v.GetBytesValue()
}
}
if key != "" {
if cfName != th.SchemaMappingConfig.SystemColumnFamily {
keyValMap := make(map[string]interface{})
keyValMap[key] = byteValue
if (*rowMap)[cfName] == nil {
(*rowMap)[cfName] = make([]Maptype, 0)
}
if existingMap, ok := (*rowMap)[cfName].([]Maptype); ok {
for k, v := range keyValMap {
existingMap = append(existingMap, Maptype{Key: k, Value: v})
}
(*rowMap)[cfName] = existingMap
} else {
(*rowMap)[cfName] = keyValMap
}
} else {
(*rowMap)[key] = byteValue
}
}
} else if slices.Contains(query.PrimaryKeys, cfName) {
(*rowMap)[cfName] = extractValue(value)
} else {
cf := func() string {
if HasDollarSymbolPrefix(cfName) {
return query.DefaultColumnFamily
}
return cfName
}()
// primary key results don't look like they're part of a column family so handle them separately
if cf != th.SchemaMappingConfig.SystemColumnFamily {
keyValMap := make(map[string]interface{})
if isWritetime {
timestamp := value.GetTimestampValue().AsTime().UnixMicro()
encoded, _ := proxycore.EncodeType(datatype.Timestamp, primitive.ProtocolVersion4, timestamp)
keyValMap[cn] = encoded
} else if isAggregate {
var val interface{}
switch v := value.Kind.(type) {
case *btpb.Value_FloatValue:
val = v.FloatValue
case *btpb.Value_IntValue:
val = v.IntValue
case *btpb.Value_BytesValue:
val = value.GetBytesValue()
case nil:
val = float64(0)
default:
th.Logger.Error("Unsupported value type for recieved in response")
return
}
keyValMap[cn] = val
} else {
keyValMap[cn] = extractValue(value)
}
if (*rowMap)[cfName] == nil {
(*rowMap)[cfName] = make(map[string]interface{})
}
if existingMap, ok := (*rowMap)[cfName].(map[string]interface{}); ok {
for k, v := range keyValMap {
existingMap[k] = v
}
} else {
(*rowMap)[cfName] = keyValMap
}
} else {
if isWritetime {
timestamp := value.GetTimestampValue().AsTime().UnixMicro()
encoded, _ := proxycore.EncodeType(datatype.Timestamp, primitive.ProtocolVersion4, timestamp)
(*rowMap)[cn] = encoded
} else if isAggregate {
var val interface{}
switch v := value.Kind.(type) {
case *btpb.Value_FloatValue:
val = v.FloatValue
case *btpb.Value_IntValue:
val = v.IntValue
case *btpb.Value_BytesValue:
val = value.GetBytesValue()
case nil:
val = float64(0)
default:
th.Logger.Error("Unsupported value type for recieved in response")
return
}
(*rowMap)[cn] = val
} else {
(*rowMap)[cn] = extractValue(value)
}
}
}
}