func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler/responsehandler.go [362:554]


func (th *TypeHandler) BuildResponseRow(rowMap map[string]interface{}, query QueryMetadata, cmd []*message.ColumnMetadata, mapKeyArray []string, lastRow bool) (message.Row, error) {
	var mr message.Row
	for index, metaData := range cmd {
		key := metaData.Name
		if rowMap[key] == nil {
			rowMap[key] = []byte{}
			mr = append(mr, []byte{})
			continue
		}
		value := rowMap[key]

		var cqlType string
		var err error
		var isCollection bool
		col := GetQueryColumn(query, index, key)
		if col.FuncName == "count" {
			cqlType = "bigint"
		} else if col.IsWriteTimeColumn {
			cqlType = "timestamp"
			if _, exists := query.AliasMap[key]; exists {
				val := value.(map[string]interface{})
				value = val[key]
			}
		} else if aliasKey, exists := query.AliasMap[key]; exists { //here
			colName := aliasKey.Name
			if col.MapKey != "" {
				colName = col.MapColumnName
			}
			cqlType, isCollection, err = th.GetColumnMeta(query.KeyspaceName, query.TableName, colName)
			if !isCollection {
				val := value.(map[string]interface{})
				value = val[aliasKey.Alias]
			}
		} else if col.IsFunc {
			funcColumn := strings.SplitN(key, "_", 2)[1]
			cqlType, isCollection, err = th.GetColumnMeta(query.KeyspaceName, query.TableName, funcColumn)
		} else if col.MapKey != "" {
			cqlType, isCollection, err = th.GetColumnMeta(query.KeyspaceName, query.TableName, col.MapColumnName)
		} else {
			cqlType, isCollection, err = th.GetColumnMeta(query.KeyspaceName, query.TableName, key)
		}
		if err != nil {
			return nil, err
		}
		cqlType = strings.ToLower(cqlType)

		if col.IsFunc || query.IsGroupBy {
			if col.FuncName == "count" {
				cqlType = "bigint"
				metaData.Type = datatype.Bigint
				if _, exists := query.AliasMap[key]; !exists && lastRow {
					column := fmt.Sprintf("system.%s(%s)", col.FuncName, col.FuncColumnName)
					metaData.Name = column
				}
			}
			var dt datatype.DataType
			var val interface{}
			// For aggregate functions, it specifically handles:
			//   - int64 values: converts to bigint or int based on CQL type
			//   - float64 values: converts to bigint, int, float, or double based on CQL type.
			// Ensuring compatibility between BigTable and Cassandra type systems for aggregated results.
			switch v := value.(type) {
			case int64:
				switch cqlType {
				case "bigint":
					val = v
					dt = datatype.Bigint
				case "int":
					val = int32(v)
					dt = datatype.Int
				default:
					return nil, fmt.Errorf("invalid cqlType - value received type: %v, CqlType: %s", v, cqlType)
				}
			case float64:
				switch cqlType {
				case "bigint":
					val = int64(v)
					dt = datatype.Bigint
				case "int":
					val = int32(v)
					dt = datatype.Int
				case "float":
					val = float32(v)
					dt = datatype.Float
				case "double":
					val = v
					dt = datatype.Double
				default:
					return nil, fmt.Errorf("invalid cqlType - value recieved type: %v, CqlType: %s", v, cqlType)
				}
			case []byte:
				mr = append(mr, v)
				continue
			default:
				return nil, fmt.Errorf("unsupported value type received in Bigtable: %v, value: %v, type: %T", cqlType, value, v)
			}
			encoded, err := proxycore.EncodeType(dt, primitive.ProtocolVersion4, val)
			if err != nil {
				return nil, fmt.Errorf("failed to encode value: %v", err)
			}
			value = encoded
			// converting key to function call implementing correct column name for aggregate function call
			if _, exists := query.AliasMap[key]; !exists && lastRow && col.IsFunc {
				column := fmt.Sprintf("system.%s(%s)", col.FuncName, col.FuncColumnName)
				metaData.Name = column
			}

			mr = append(mr, value.([]byte))
			continue
		}
		dt, err := utilities.GetCassandraColumnType(cqlType)
		if err != nil {
			return nil, err
		}
		if isCollection {
			if dt.GetDataTypeCode() == primitive.DataTypeCodeSet {
				setType := dt.(datatype.SetType)
				// creating array
				setval := []interface{}{}
				for _, val := range value.([]Maptype) {
					setval = append(setval, val.Key)
				}
				err = th.HandleSetType(setval, &mr, setType, query.ProtocalV)
				if err != nil {
					return nil, err
				}
			} else if dt.GetDataTypeCode() == primitive.DataTypeCodeList {
				listType := dt.(datatype.ListType)
				listVal := []interface{}{}
				for _, val := range value.([]Maptype) {
					listVal = append(listVal, val.Value)
				}
				err = th.HandleListType(listVal, &mr, listType, query.ProtocalV)
				if err != nil {
					return nil, err
				}
			} else if dt.GetDataTypeCode() == primitive.DataTypeCodeMap {
				mapType := dt.(datatype.MapType)
				mapData := make(map[string]interface{})

				mapKey := ""
				if col.MapKey != "" {
					mapKey = mapKeyArray[index]
				}

				if mapKey != "" {
					if _, exists := query.AliasMap[key]; !exists && lastRow {
						column := fmt.Sprintf("%s['%s']", col.MapColumnName, col.MapKey)
						metaData.Name = column
					}
					if _, exists := query.AliasMap[key]; exists {
						val := value.(map[string]interface{})
						value = val[key]
					}
					if lastRow {
						metaData.Type = mapType.GetValueType()
					}
					if value == nil {
						mr = append(mr, []byte{})
						continue
					}
					mr = append(mr, value.([]byte))
					continue
				}

				for _, val := range value.([]Maptype) {
					mapData[val.Key] = val.Value
				}

				if mapType.GetKeyType() == datatype.Varchar {
					err = th.HandleMapType(mapData, &mr, mapType, query.ProtocalV)
				} else if mapType.GetKeyType() == datatype.Timestamp {
					err = th.HandleTimestampMap(mapData, &mr, mapType, query.ProtocalV)
				}
				if err != nil {
					th.Logger.Error("Error while Encoding json->bytes -> ", zap.Error(err))
					return nil, fmt.Errorf("failed to retrieve Map data: %v", err)
				}
			}
		} else {
			value, err = HandlePrimitiveEncoding(dt, value, query.ProtocalV, true)
			if err != nil {
				return nil, err
			}
			if value == nil {
				mr = append(mr, []byte{})
				continue
			}
			mr = append(mr, value.([]byte))
		}
	}
	return mr, nil
}