func toMaxComputeData()

in odps/tableschema/arrow_util.go [193:405]


func toMaxComputeData(vector array.Interface, index int, typeInfo datatype.DataType, cfg *arrowOptions) (data.Data, error) {
	switch typeInfo.ID() {
	case datatype.BOOLEAN:
		value := vector.(*array.Boolean).Value(index)
		return data.Bool(value), nil
	case datatype.TINYINT:
		value := vector.(*array.Int8).Value(index)
		return data.TinyInt(value), nil
	case datatype.SMALLINT:
		value := vector.(*array.Int16).Value(index)
		return data.SmallInt(value), nil
	case datatype.INT:
		value := vector.(*array.Int32).Value(index)
		return data.Int(value), nil
	case datatype.BIGINT:
		value := vector.(*array.Int64).Value(index)
		return data.BigInt(value), nil
	case datatype.FLOAT:
		value := vector.(*array.Float32).Value(index)
		return data.Float(value), nil
	case datatype.DOUBLE:
		value := vector.(*array.Float64).Value(index)
		return data.Double(value), nil
	case datatype.CHAR, datatype.VARCHAR, datatype.STRING, datatype.JSON:
		value := vector.(*array.String).Value(index)
		return data.String(value), nil
	case datatype.BINARY:
		value := vector.(*array.Binary).Value(index)
		return data.Binary(value), nil
	case datatype.DATE:
		value := vector.(*array.Date32).Value(index)
		// Date32 从 epoch(1970-01-01)起的天数
		days := int64(value)
		// 将天数转换为 time.Duration,并加到 epoch 时间上
		return data.Date(time.Unix(0, 0).AddDate(0, 0, int(days))), nil
	case datatype.DATETIME:
		value := vector.(*array.Timestamp).Value(index)
		epochTime := int64(value)
		switch cfg.DatetimeUnit {
		case Second:
			return data.DateTime(time.Unix(epochTime, 0)), nil
		case Milli:
			return data.DateTime(time.Unix(epochTime/1e3, (epochTime%1e3)*1e6)), nil
		case Micro:
			return data.DateTime(time.Unix(epochTime/1e6, (epochTime%1e6)*1e3)), nil
		case Nano:
			return data.DateTime(time.Unix(0, epochTime)), nil
		}
	case datatype.TIMESTAMP:
		if cfg.ExtendedMode {
			sec := vector.(*array.Struct).Field(0).(*array.Int64).Value(index)
			nano := vector.(*array.Struct).Field(1).(*array.Int32).Value(index)
			return data.Timestamp(time.Unix(sec, int64(nano))), nil
		} else {
			value := vector.(*array.Timestamp).Value(index)
			epochTime := int64(value)
			switch cfg.TimestampUnit {
			case Second:
				return data.Timestamp(time.Unix(epochTime, 0)), nil
			case Milli:
				return data.Timestamp(time.Unix(epochTime/1e3, (epochTime%1e3)*1e6)), nil
			case Micro:
				return data.Timestamp(time.Unix(epochTime/1e6, (epochTime%1e6)*1e3)), nil
			case Nano:
				return data.Timestamp(time.Unix(0, epochTime)), nil
			}
		}
	case datatype.TIMESTAMP_NTZ:
		if cfg.ExtendedMode {
			sec := vector.(*array.Struct).Field(0).(*array.Int64).Value(index)
			nano := vector.(*array.Struct).Field(1).(*array.Int32).Value(index)
			return data.TimestampNtz(time.Unix(sec, int64(nano))), nil
		} else {
			value := vector.(*array.Timestamp).Value(index)
			epochTime := int64(value)
			switch cfg.TimestampUnit {
			case Second:
				return data.TimestampNtz(time.Unix(epochTime, 0)), nil
			case Milli:
				return data.TimestampNtz(time.Unix(epochTime/1e3, (epochTime%1e3)*1e6)), nil
			case Micro:
				return data.TimestampNtz(time.Unix(epochTime/1e6, (epochTime%1e6)*1e3)), nil
			case Nano:
				return data.TimestampNtz(time.Unix(0, epochTime)), nil
			}
		}
	case datatype.DECIMAL:
		decimalType := typeInfo.(datatype.DecimalType)
		if cfg.ExtendedMode {
			fixedSizeBinaryVector, extendedMode := vector.(*array.FixedSizeBinary)
			if extendedMode {
				val := fixedSizeBinaryVector.Value(index)
				if len(val) < 8 {
					return nil, errors.Errorf("Unrecognized Decimal type, val len %d", len(val))
				}

				mSign := val[1]
				mIntg := val[2]
				mFrac := val[3]

				var decimalBuilder strings.Builder

				if mSign > 0 {
					decimalBuilder.WriteString("-")
				}

				for j := int(mIntg); j > 0; j-- {
					num := int(binary.LittleEndian.Uint32(val[8+j*4 : 12+j*4]))

					if j == int(mIntg) {
						decimalBuilder.WriteString(fmt.Sprintf("%d", num))
					} else {
						decimalBuilder.WriteString(fmt.Sprintf("%09d", num))
					}
				}

				decimalBuilder.WriteString(".")
				for j := 0; j < int(mFrac); j++ {
					num := int(binary.LittleEndian.Uint32(val[8-4*j : 8-4*j+4]))
					decimalBuilder.WriteString(fmt.Sprintf("%09d", num))
				}

				// trim trailing zeros
				result := decimalBuilder.String()
				result = strings.TrimRight(result, "0")
				if strings.HasSuffix(result, ".") {
					result = result[:len(result)-1] // 移除多余的小数点
				}
				decimal := data.NewDecimal(int(decimalType.Precision), int(decimalType.Scale), result)
				return decimal, nil
			}
		}
		value := vector.(*array.Decimal128).Value(index)
		decimal := data.NewDecimalFromValue(int(decimalType.Precision), int(decimalType.Scale), value.BigInt())
		return decimal, nil
	case datatype.ARRAY:
		arrayType := typeInfo.(datatype.ArrayType)

		// 处理 LIST 类型
		listCol := vector.(*array.List)

		// 获取偏移值,包括第 i 个列表对应的偏移量
		offsets := listCol.Offsets()
		start := offsets[index] // 当前列表的起始位置
		end := offsets[index+1] // 下一个列表的起始位置

		numElements := end - start                      // 当前列表的元素数量
		listData := make([]interface{}, 0, numElements) // 创建容量为当前列表长度的切片

		// 获取当前列表的值
		childArray := listCol.ListValues() // 获取子列表,这里通常是一个 Array 接口

		// 遍历子列表中的实际元素
		for j := start; j < end; j++ {
			elementData, err := toMaxComputeData(childArray, int(j-start), arrayType.ElementType, cfg)
			if err != nil {
				return nil, err
			}
			listData = append(listData, elementData)
		}
		return data.ArrayFromSlice(listData...)

	case datatype.MAP:
		mapType := typeInfo.(datatype.MapType)
		// 处理 MAP 类型
		mapCol := vector.(*array.Map)

		offsets := mapCol.Offsets()
		start := offsets[index]
		end := offsets[index+1]

		mapData := make(map[interface{}]interface{})
		keys := mapCol.Keys()
		values := mapCol.Items()

		for j := start; j < end; j++ {
			keyData, err := toMaxComputeData(keys, int(j-start), mapType.KeyType, cfg)
			if err != nil {
				return nil, err
			}
			valueData, err := toMaxComputeData(values, int(j-start), mapType.ValueType, cfg)
			if err != nil {
				return nil, err
			}
			mapData[keyData] = valueData
		}
		return data.MapFromGoMap(mapData)

	case datatype.STRUCT:
		structType := typeInfo.(datatype.StructType)
		// 处理 STRUCT 类型
		structCol := vector.(*array.Struct)

		structData := data.NewStructWithTyp(structType)

		for fieldIndex := 0; fieldIndex < structCol.NumField(); fieldIndex++ {
			fieldType := structType.Fields[fieldIndex]
			fieldName := fieldType.Name

			fieldArray := structCol.Field(fieldIndex)
			fieldValue, err := toMaxComputeData(fieldArray, index, fieldType.Type, cfg)
			if err != nil {
				return nil, err
			}
			err = structData.SetField(fieldName, fieldValue)
			if err != nil {
				return nil, err
			}
		}
		return structData, nil
	}
	return nil, fmt.Errorf("unsupported ODPS type: %v", typeInfo.Name())
}