func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler/responsehandler.go [154:261]


func (th *TypeHandler) ProcessArray(value *btpb.Value, rowMap *map[string]interface{}, cfName string, query QueryMetadata, cn string, isWritetime bool, isAggregate bool) {
	if IsArrayType(value) {
		arr := value.GetArrayValue().Values
		length := len(arr)
		var key string
		var byteValue []byte
		for i, v := range arr {
			if IsArrayType(v) {
				th.ProcessArray(v, rowMap, cfName, query, cn, isWritetime, isAggregate)
			} else if length == 2 && i == 0 {
				key = string(v.GetBytesValue())
			} else if length == 2 && i == 1 {
				byteValue = v.GetBytesValue()
			}
		}
		if key != "" {
			if cfName != th.SchemaMappingConfig.SystemColumnFamily {
				keyValMap := make(map[string]interface{})
				keyValMap[key] = byteValue

				if (*rowMap)[cfName] == nil {
					(*rowMap)[cfName] = make([]Maptype, 0)
				}
				if existingMap, ok := (*rowMap)[cfName].([]Maptype); ok {
					for k, v := range keyValMap {
						existingMap = append(existingMap, Maptype{Key: k, Value: v})
					}
					(*rowMap)[cfName] = existingMap
				} else {
					(*rowMap)[cfName] = keyValMap
				}
			} else {
				(*rowMap)[key] = byteValue
			}
		}
	} else if slices.Contains(query.PrimaryKeys, cfName) {
		(*rowMap)[cfName] = extractValue(value)
	} else {
		cf := func() string {
			if HasDollarSymbolPrefix(cfName) {
				return query.DefaultColumnFamily
			}
			return cfName
		}()
		// primary key results don't look like they're part of a column family so handle them separately
		if cf != th.SchemaMappingConfig.SystemColumnFamily {
			keyValMap := make(map[string]interface{})
			if isWritetime {
				timestamp := value.GetTimestampValue().AsTime().UnixMicro()
				encoded, _ := proxycore.EncodeType(datatype.Timestamp, primitive.ProtocolVersion4, timestamp)
				keyValMap[cn] = encoded
			} else if isAggregate {
				var val interface{}
				switch v := value.Kind.(type) {
				case *btpb.Value_FloatValue:
					val = v.FloatValue
				case *btpb.Value_IntValue:
					val = v.IntValue
				case *btpb.Value_BytesValue:
					val = value.GetBytesValue()
				case nil:
					val = float64(0)
				default:
					th.Logger.Error("Unsupported value type for recieved in response")
					return
				}
				keyValMap[cn] = val
			} else {
				keyValMap[cn] = extractValue(value)
			}
			if (*rowMap)[cfName] == nil {
				(*rowMap)[cfName] = make(map[string]interface{})
			}
			if existingMap, ok := (*rowMap)[cfName].(map[string]interface{}); ok {
				for k, v := range keyValMap {
					existingMap[k] = v
				}
			} else {
				(*rowMap)[cfName] = keyValMap
			}
		} else {
			if isWritetime {
				timestamp := value.GetTimestampValue().AsTime().UnixMicro()
				encoded, _ := proxycore.EncodeType(datatype.Timestamp, primitive.ProtocolVersion4, timestamp)
				(*rowMap)[cn] = encoded
			} else if isAggregate {
				var val interface{}
				switch v := value.Kind.(type) {
				case *btpb.Value_FloatValue:
					val = v.FloatValue
				case *btpb.Value_IntValue:
					val = v.IntValue
				case *btpb.Value_BytesValue:
					val = value.GetBytesValue()
				case nil:
					val = float64(0)
				default:
					th.Logger.Error("Unsupported value type for recieved in response")
					return
				}
				(*rowMap)[cn] = val

			} else {
				(*rowMap)[cn] = extractValue(value)
			}
		}
	}
}