table/arrow_utils.go (1,024 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package table
import (
"context"
"fmt"
"iter"
"slices"
"strconv"
"strings"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/bitutil"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/extensions"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
tblutils "github.com/apache/iceberg-go/table/internal"
"github.com/google/uuid"
"github.com/pterm/pterm"
)
// constants to look for as Keys in Arrow field metadata
const (
ArrowFieldDocKey = "doc"
// Arrow schemas that are generated from the Parquet library will utilize
// this key to identify the field id of the source Parquet field.
// We use this when converting to Iceberg to provide field IDs
ArrowParquetFieldIDKey = "PARQUET:field_id"
)
// ArrowSchemaVisitor is an interface that can be implemented and used to
// call VisitArrowSchema for iterating
type ArrowSchemaVisitor[T any] interface {
Schema(*arrow.Schema, T) T
Struct(*arrow.StructType, []T) T
Field(arrow.Field, T) T
List(arrow.ListLikeType, T) T
Map(mt *arrow.MapType, keyResult T, valueResult T) T
Primitive(arrow.DataType) T
}
func VisitArrowSchema[T any](sc *arrow.Schema, visitor ArrowSchemaVisitor[T]) (res T, err error) {
if sc == nil {
err = fmt.Errorf("%w: cannot visit nil arrow schema", iceberg.ErrInvalidArgument)
return
}
defer internal.RecoverError(&err)
return visitor.Schema(sc, visitArrowStruct(arrow.StructOf(sc.Fields()...), visitor)), err
}
func visitArrowField[T any](f arrow.Field, visitor ArrowSchemaVisitor[T]) T {
switch typ := f.Type.(type) {
case *arrow.StructType:
return visitArrowStruct(typ, visitor)
case *arrow.MapType:
return visitArrowMap(typ, visitor)
case arrow.ListLikeType:
return visitArrowList(typ, visitor)
default:
return visitor.Primitive(typ)
}
}
func visitArrowStruct[T any](dt *arrow.StructType, visitor ArrowSchemaVisitor[T]) T {
type (
beforeField interface {
BeforeField(arrow.Field)
}
afterField interface {
AfterField(arrow.Field)
}
)
results := make([]T, dt.NumFields())
bf, _ := visitor.(beforeField)
af, _ := visitor.(afterField)
for i, f := range dt.Fields() {
if bf != nil {
bf.BeforeField(f)
}
res := visitArrowField(f, visitor)
if af != nil {
af.AfterField(f)
}
results[i] = visitor.Field(f, res)
}
return visitor.Struct(dt, results)
}
func visitArrowMap[T any](dt *arrow.MapType, visitor ArrowSchemaVisitor[T]) T {
type (
beforeMapKey interface {
BeforeMapKey(arrow.Field)
}
beforeMapValue interface {
BeforeMapValue(arrow.Field)
}
afterMapKey interface {
AfterMapKey(arrow.Field)
}
afterMapValue interface {
AfterMapValue(arrow.Field)
}
)
key, val := dt.KeyField(), dt.ItemField()
if bmk, ok := visitor.(beforeMapKey); ok {
bmk.BeforeMapKey(key)
}
keyResult := visitArrowField(key, visitor)
if amk, ok := visitor.(afterMapKey); ok {
amk.AfterMapKey(key)
}
if bmv, ok := visitor.(beforeMapValue); ok {
bmv.BeforeMapValue(val)
}
valueResult := visitArrowField(val, visitor)
if amv, ok := visitor.(afterMapValue); ok {
amv.AfterMapValue(val)
}
return visitor.Map(dt, keyResult, valueResult)
}
func visitArrowList[T any](dt arrow.ListLikeType, visitor ArrowSchemaVisitor[T]) T {
type (
beforeListElem interface {
BeforeListElement(arrow.Field)
}
afterListElem interface {
AfterListElement(arrow.Field)
}
)
elemField := dt.ElemField()
if bl, ok := visitor.(beforeListElem); ok {
bl.BeforeListElement(elemField)
}
res := visitArrowField(elemField, visitor)
if al, ok := visitor.(afterListElem); ok {
al.AfterListElement(elemField)
}
return visitor.List(dt, res)
}
func getFieldID(f arrow.Field) *int {
if !f.HasMetadata() {
return nil
}
fieldIDStr, ok := f.Metadata.GetValue(ArrowParquetFieldIDKey)
if !ok {
return nil
}
id, err := strconv.Atoi(fieldIDStr)
if err != nil {
return nil
}
if id > 0 {
return &id
}
return nil
}
type hasIDs struct{}
func (hasIDs) Schema(sc *arrow.Schema, result bool) bool {
return result
}
func (hasIDs) Struct(st *arrow.StructType, results []bool) bool {
return !slices.Contains(results, false)
}
func (hasIDs) Field(f arrow.Field, result bool) bool {
return getFieldID(f) != nil
}
func (hasIDs) List(dt arrow.ListLikeType, elem bool) bool {
elemField := dt.ElemField()
return elem && getFieldID(elemField) != nil
}
func (hasIDs) Map(m *arrow.MapType, key, val bool) bool {
return key && val &&
getFieldID(m.KeyField()) != nil && getFieldID(m.ItemField()) != nil
}
func (hasIDs) Primitive(arrow.DataType) bool { return true }
type convertToIceberg struct {
downcastTimestamp bool
fieldID func(arrow.Field) int
}
func (convertToIceberg) Schema(_ *arrow.Schema, result iceberg.NestedField) iceberg.NestedField {
return result
}
func (convertToIceberg) Struct(_ *arrow.StructType, results []iceberg.NestedField) iceberg.NestedField {
return iceberg.NestedField{
Type: &iceberg.StructType{FieldList: results},
}
}
func (c convertToIceberg) Field(field arrow.Field, result iceberg.NestedField) iceberg.NestedField {
result.ID = c.fieldID(field)
if field.HasMetadata() {
if doc, ok := field.Metadata.GetValue(ArrowFieldDocKey); ok {
result.Doc = doc
}
}
result.Required = !field.Nullable
result.Name = field.Name
return result
}
func (c convertToIceberg) List(dt arrow.ListLikeType, elemResult iceberg.NestedField) iceberg.NestedField {
elemField := dt.ElemField()
elemID := c.fieldID(elemField)
return iceberg.NestedField{
Type: &iceberg.ListType{
ElementID: elemID,
Element: elemResult.Type,
ElementRequired: !elemField.Nullable,
},
}
}
func (c convertToIceberg) Map(m *arrow.MapType, keyResult, valueResult iceberg.NestedField) iceberg.NestedField {
keyField, valField := m.KeyField(), m.ItemField()
keyID, valID := c.fieldID(keyField), c.fieldID(valField)
return iceberg.NestedField{
Type: &iceberg.MapType{
KeyID: keyID,
KeyType: keyResult.Type,
ValueID: valID,
ValueType: valueResult.Type,
ValueRequired: !valField.Nullable,
},
}
}
var utcAliases = []string{"UTC", "+00:00", "Etc/UTC", "Z"}
func (c convertToIceberg) Primitive(dt arrow.DataType) (result iceberg.NestedField) {
switch dt := dt.(type) {
case *arrow.DictionaryType:
if _, ok := dt.ValueType.(arrow.NestedType); ok {
panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt))
}
return c.Primitive(dt.ValueType)
case *arrow.RunEndEncodedType:
if _, ok := dt.Encoded().(arrow.NestedType); ok {
panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt))
}
return c.Primitive(dt.Encoded())
case *arrow.BooleanType:
result.Type = iceberg.PrimitiveTypes.Bool
case *arrow.Uint8Type, *arrow.Uint16Type, *arrow.Uint32Type,
*arrow.Int8Type, *arrow.Int16Type, *arrow.Int32Type:
result.Type = iceberg.PrimitiveTypes.Int32
case *arrow.Uint64Type, *arrow.Int64Type:
result.Type = iceberg.PrimitiveTypes.Int64
case *arrow.Float16Type, *arrow.Float32Type:
result.Type = iceberg.PrimitiveTypes.Float32
case *arrow.Float64Type:
result.Type = iceberg.PrimitiveTypes.Float64
case *arrow.Decimal32Type, *arrow.Decimal64Type, *arrow.Decimal128Type:
dec := dt.(arrow.DecimalType)
result.Type = iceberg.DecimalTypeOf(int(dec.GetPrecision()), int(dec.GetScale()))
case *arrow.StringType, *arrow.LargeStringType:
result.Type = iceberg.PrimitiveTypes.String
case *arrow.BinaryType, *arrow.LargeBinaryType:
result.Type = iceberg.PrimitiveTypes.Binary
case *arrow.Date32Type:
result.Type = iceberg.PrimitiveTypes.Date
case *arrow.Time64Type:
if dt.Unit == arrow.Microsecond {
result.Type = iceberg.PrimitiveTypes.Time
} else {
panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt))
}
case *arrow.TimestampType:
if dt.Unit == arrow.Nanosecond {
if !c.downcastTimestamp {
panic(fmt.Errorf("%w: 'ns' timestamp precision not supported", iceberg.ErrType))
}
// TODO: log something
}
if slices.Contains(utcAliases, dt.TimeZone) {
result.Type = iceberg.PrimitiveTypes.TimestampTz
} else if dt.TimeZone == "" {
result.Type = iceberg.PrimitiveTypes.Timestamp
} else {
panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt))
}
case *arrow.FixedSizeBinaryType:
result.Type = iceberg.FixedTypeOf(dt.ByteWidth)
case arrow.ExtensionType:
if dt.ExtensionName() == "arrow.uuid" {
result.Type = iceberg.PrimitiveTypes.UUID
} else {
panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt))
}
default:
panic(fmt.Errorf("%w: unsupported arrow type for conversion - %s", iceberg.ErrInvalidSchema, dt))
}
return
}
func ArrowTypeToIceberg(dt arrow.DataType, downcastNsTimestamp bool) (iceberg.Type, error) {
sc := arrow.NewSchema([]arrow.Field{{
Type: dt,
Metadata: arrow.NewMetadata([]string{ArrowParquetFieldIDKey}, []string{"1"}),
}}, nil)
out, err := VisitArrowSchema(sc, convertToIceberg{
downcastTimestamp: downcastNsTimestamp,
fieldID: func(field arrow.Field) int {
if id := getFieldID(field); id != nil {
return *id
}
panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing field_id",
iceberg.ErrInvalidSchema, field))
},
})
if err != nil {
return nil, err
}
return out.Type.(*iceberg.StructType).FieldList[0].Type, nil
}
func ArrowSchemaToIceberg(sc *arrow.Schema, downcastNsTimestamp bool, nameMapping iceberg.NameMapping) (*iceberg.Schema, error) {
hasIDs, _ := VisitArrowSchema(sc, hasIDs{})
switch {
case hasIDs:
out, err := VisitArrowSchema(sc, convertToIceberg{
downcastTimestamp: downcastNsTimestamp,
fieldID: func(field arrow.Field) int {
if id := getFieldID(field); id != nil {
return *id
}
panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing field_id",
iceberg.ErrInvalidSchema, field))
},
})
if err != nil {
return nil, err
}
return iceberg.NewSchema(0, out.Type.(*iceberg.StructType).FieldList...), nil
case nameMapping != nil:
schemaWithoutIDs, err := arrowToSchemaWithoutIDs(sc, downcastNsTimestamp)
if err != nil {
return nil, err
}
return iceberg.ApplyNameMapping(schemaWithoutIDs, nameMapping)
default:
return nil, fmt.Errorf("%w: arrow schema does not have field-ids and no name mapping provided",
iceberg.ErrInvalidSchema)
}
}
func ArrowSchemaToIcebergWithFreshIDs(sc *arrow.Schema, downcastNsTimestamp bool) (*iceberg.Schema, error) {
schemaWithoutIDs, err := arrowToSchemaWithoutIDs(sc, downcastNsTimestamp)
if err != nil {
return nil, err
}
return iceberg.AssignFreshSchemaIDs(schemaWithoutIDs, nil)
}
func arrowToSchemaWithoutIDs(sc *arrow.Schema, downcastNsTimestamp bool) (*iceberg.Schema, error) {
withoutIDs, err := VisitArrowSchema(sc, convertToIceberg{
downcastTimestamp: downcastNsTimestamp,
fieldID: func(_ arrow.Field) int { return -1 },
})
if err != nil {
return nil, err
}
schemaWithoutIDs := iceberg.NewSchema(0, withoutIDs.Type.(*iceberg.StructType).FieldList...)
return schemaWithoutIDs, nil
}
type convertToSmallTypes struct{}
func (convertToSmallTypes) Schema(_ *arrow.Schema, structResult arrow.Field) arrow.Field {
return structResult
}
func (convertToSmallTypes) Struct(_ *arrow.StructType, results []arrow.Field) arrow.Field {
return arrow.Field{Type: arrow.StructOf(results...)}
}
func (convertToSmallTypes) Field(field arrow.Field, fieldResult arrow.Field) arrow.Field {
field.Type = fieldResult.Type
return field
}
func (convertToSmallTypes) List(_ arrow.ListLikeType, elemResult arrow.Field) arrow.Field {
return arrow.Field{Type: arrow.ListOfField(elemResult)}
}
func (convertToSmallTypes) Map(_ *arrow.MapType, keyResult, valueResult arrow.Field) arrow.Field {
return arrow.Field{
Type: arrow.MapOfWithMetadata(keyResult.Type, keyResult.Metadata,
valueResult.Type, valueResult.Metadata),
}
}
func (convertToSmallTypes) Primitive(dt arrow.DataType) arrow.Field {
switch dt.ID() {
case arrow.LARGE_STRING:
dt = arrow.BinaryTypes.String
case arrow.LARGE_BINARY:
dt = arrow.BinaryTypes.Binary
}
return arrow.Field{Type: dt}
}
func ensureSmallArrowTypes(dt arrow.DataType) (arrow.DataType, error) {
top, err := VisitArrowSchema(arrow.NewSchema([]arrow.Field{{Type: dt}}, nil), convertToSmallTypes{})
if err != nil {
return nil, err
}
return top.Type.(*arrow.StructType).Field(0).Type, nil
}
type convertToArrow struct {
metadata map[string]string
includeFieldIDs bool
useLargeTypes bool
}
func (c convertToArrow) Schema(_ *iceberg.Schema, result arrow.Field) arrow.Field {
result.Metadata = arrow.MetadataFrom(c.metadata)
return result
}
func (c convertToArrow) Struct(_ iceberg.StructType, results []arrow.Field) arrow.Field {
return arrow.Field{Type: arrow.StructOf(results...)}
}
func (c convertToArrow) Field(field iceberg.NestedField, result arrow.Field) arrow.Field {
meta := map[string]string{}
if len(field.Doc) > 0 {
meta[ArrowFieldDocKey] = field.Doc
}
if c.includeFieldIDs {
meta[ArrowParquetFieldIDKey] = strconv.Itoa(field.ID)
}
if len(meta) > 0 {
result.Metadata = arrow.MetadataFrom(meta)
}
result.Name, result.Nullable = field.Name, !field.Required
return result
}
func (c convertToArrow) List(list iceberg.ListType, elemResult arrow.Field) arrow.Field {
elemField := c.Field(list.ElementField(), elemResult)
if c.useLargeTypes {
return arrow.Field{Type: arrow.LargeListOfField(elemField)}
}
return arrow.Field{Type: arrow.ListOfField(elemField)}
}
func (c convertToArrow) Map(m iceberg.MapType, keyResult, valResult arrow.Field) arrow.Field {
keyField := c.Field(m.KeyField(), keyResult)
valField := c.Field(m.ValueField(), valResult)
return arrow.Field{Type: arrow.MapOfFields(keyField, valField)}
}
func (c convertToArrow) Primitive(iceberg.PrimitiveType) arrow.Field { panic("shouldn't be called") }
func (c convertToArrow) VisitFixed(f iceberg.FixedType) arrow.Field {
return arrow.Field{Type: &arrow.FixedSizeBinaryType{ByteWidth: f.Len()}}
}
func (c convertToArrow) VisitDecimal(d iceberg.DecimalType) arrow.Field {
return arrow.Field{Type: &arrow.Decimal128Type{
Precision: int32(d.Precision()), Scale: int32(d.Scale()),
}}
}
func (c convertToArrow) VisitBoolean() arrow.Field {
return arrow.Field{Type: arrow.FixedWidthTypes.Boolean}
}
func (c convertToArrow) VisitInt32() arrow.Field {
return arrow.Field{Type: arrow.PrimitiveTypes.Int32}
}
func (c convertToArrow) VisitInt64() arrow.Field {
return arrow.Field{Type: arrow.PrimitiveTypes.Int64}
}
func (c convertToArrow) VisitFloat32() arrow.Field {
return arrow.Field{Type: arrow.PrimitiveTypes.Float32}
}
func (c convertToArrow) VisitFloat64() arrow.Field {
return arrow.Field{Type: arrow.PrimitiveTypes.Float64}
}
func (c convertToArrow) VisitDate() arrow.Field {
return arrow.Field{Type: arrow.FixedWidthTypes.Date32}
}
func (c convertToArrow) VisitTime() arrow.Field {
return arrow.Field{Type: arrow.FixedWidthTypes.Time64us}
}
func (c convertToArrow) VisitTimestampTz() arrow.Field {
return arrow.Field{Type: arrow.FixedWidthTypes.Timestamp_us}
}
func (c convertToArrow) VisitTimestamp() arrow.Field {
return arrow.Field{Type: &arrow.TimestampType{Unit: arrow.Microsecond}}
}
func (c convertToArrow) VisitString() arrow.Field {
if c.useLargeTypes {
return arrow.Field{Type: arrow.BinaryTypes.LargeString}
}
return arrow.Field{Type: arrow.BinaryTypes.String}
}
func (c convertToArrow) VisitBinary() arrow.Field {
if c.useLargeTypes {
return arrow.Field{Type: arrow.BinaryTypes.LargeBinary}
}
return arrow.Field{Type: arrow.BinaryTypes.Binary}
}
func (c convertToArrow) VisitUUID() arrow.Field {
return arrow.Field{Type: extensions.NewUUIDType()}
}
// SchemaToArrowSchema converts an Iceberg schema to an Arrow schema. If the metadata parameter
// is non-nil, it will be included as the top-level metadata in the schema. If includeFieldIDs
// is true, then each field of the schema will contain a metadata key PARQUET:field_id set to
// the field id from the iceberg schema.
func SchemaToArrowSchema(sc *iceberg.Schema, metadata map[string]string, includeFieldIDs, useLargeTypes bool) (*arrow.Schema, error) {
top, err := iceberg.Visit(sc, convertToArrow{
metadata: metadata,
includeFieldIDs: includeFieldIDs, useLargeTypes: useLargeTypes,
})
if err != nil {
return nil, err
}
return arrow.NewSchema(top.Type.(*arrow.StructType).Fields(), &top.Metadata), nil
}
// TypeToArrowType converts a given iceberg type, into the equivalent Arrow data type.
// For dealing with nested fields (List, Struct, Map) if includeFieldIDs is true, then
// the child fields will contain a metadata key PARQUET:field_id set to the field id.
func TypeToArrowType(t iceberg.Type, includeFieldIDs bool, useLargeTypes bool) (arrow.DataType, error) {
top, err := iceberg.Visit(iceberg.NewSchema(0, iceberg.NestedField{Type: t}),
convertToArrow{includeFieldIDs: includeFieldIDs, useLargeTypes: useLargeTypes})
if err != nil {
return nil, err
}
return top.Type.(*arrow.StructType).Field(0).Type, nil
}
type arrowAccessor struct {
fileSchema *iceberg.Schema
}
func (a arrowAccessor) SchemaPartner(partner arrow.Array) arrow.Array {
return partner
}
func (a arrowAccessor) FieldPartner(partnerStruct arrow.Array, fieldID int, _ string) arrow.Array {
if partnerStruct == nil {
return nil
}
field, ok := a.fileSchema.FindFieldByID(fieldID)
if !ok {
return nil
}
if st, ok := partnerStruct.(*array.Struct); ok {
if idx, ok := st.DataType().(*arrow.StructType).FieldIdx(field.Name); ok {
return st.Field(idx)
}
}
panic(fmt.Errorf("cannot find %s in expected partner_struct type %s",
field.Name, partnerStruct.DataType()))
}
func (a arrowAccessor) ListElementPartner(partnerList arrow.Array) arrow.Array {
if l, ok := partnerList.(array.ListLike); ok {
return l.ListValues()
}
return nil
}
func (a arrowAccessor) MapKeyPartner(partnerMap arrow.Array) arrow.Array {
if m, ok := partnerMap.(*array.Map); ok {
return m.Keys()
}
return nil
}
func (a arrowAccessor) MapValuePartner(partnerMap arrow.Array) arrow.Array {
if m, ok := partnerMap.(*array.Map); ok {
return m.Items()
}
return nil
}
func retOrPanic[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}
type arrowProjectionVisitor struct {
ctx context.Context
fileSchema *iceberg.Schema
includeFieldIDs bool
downcastNsTimestamp bool
useLargeTypes bool
}
func (a *arrowProjectionVisitor) castIfNeeded(field iceberg.NestedField, vals arrow.Array) arrow.Array {
fileField, ok := a.fileSchema.FindFieldByID(field.ID)
if !ok {
panic(fmt.Errorf("could not find field id %d in schema", field.ID))
}
typ, ok := fileField.Type.(iceberg.PrimitiveType)
if !ok {
vals.Retain()
return vals
}
if !field.Type.Equals(typ) {
promoted := retOrPanic(iceberg.PromoteType(fileField.Type, field.Type))
targetType := retOrPanic(TypeToArrowType(promoted, a.includeFieldIDs, a.useLargeTypes))
if !a.useLargeTypes {
targetType = retOrPanic(ensureSmallArrowTypes(targetType))
}
return retOrPanic(compute.CastArray(a.ctx, vals,
compute.SafeCastOptions(targetType)))
}
targetType := retOrPanic(TypeToArrowType(field.Type, a.includeFieldIDs, a.useLargeTypes))
if !arrow.TypeEqual(targetType, vals.DataType()) {
switch field.Type.(type) {
case iceberg.TimestampType:
tt, tgtok := targetType.(*arrow.TimestampType)
vt, valok := vals.DataType().(*arrow.TimestampType)
if tgtok && valok && tt.TimeZone == "" && vt.TimeZone == "" && tt.Unit == arrow.Microsecond {
if vt.Unit == arrow.Nanosecond && a.downcastNsTimestamp {
return retOrPanic(compute.CastArray(a.ctx, vals, compute.UnsafeCastOptions(tt)))
} else if vt.Unit == arrow.Second || vt.Unit == arrow.Millisecond {
return retOrPanic(compute.CastArray(a.ctx, vals, compute.SafeCastOptions(tt)))
}
}
panic(fmt.Errorf("unsupported schema projection from %s to %s",
vals.DataType(), targetType))
case iceberg.TimestampTzType:
tt, tgtok := targetType.(*arrow.TimestampType)
vt, valok := vals.DataType().(*arrow.TimestampType)
if tgtok && valok && tt.TimeZone == "UTC" &&
slices.Contains(utcAliases, vt.TimeZone) && tt.Unit == arrow.Microsecond {
if vt.Unit == arrow.Nanosecond && a.downcastNsTimestamp {
return retOrPanic(compute.CastArray(a.ctx, vals, compute.UnsafeCastOptions(tt)))
} else if vt.Unit != arrow.Nanosecond {
return retOrPanic(compute.CastArray(a.ctx, vals, compute.SafeCastOptions(tt)))
}
}
panic(fmt.Errorf("unsupported schema projection from %s to %s",
vals.DataType(), targetType))
default:
return retOrPanic(compute.CastArray(a.ctx, vals,
compute.SafeCastOptions(targetType)))
}
}
vals.Retain()
return vals
}
func (a *arrowProjectionVisitor) constructField(field iceberg.NestedField, arrowType arrow.DataType) arrow.Field {
metadata := map[string]string{}
if field.Doc != "" {
metadata[ArrowFieldDocKey] = field.Doc
}
if a.includeFieldIDs {
metadata[ArrowParquetFieldIDKey] = strconv.Itoa(field.ID)
}
return arrow.Field{
Name: field.Name,
Type: arrowType,
Nullable: !field.Required,
Metadata: arrow.MetadataFrom(metadata),
}
}
func (a *arrowProjectionVisitor) Schema(_ *iceberg.Schema, _ arrow.Array, result arrow.Array) arrow.Array {
return result
}
func (a *arrowProjectionVisitor) Struct(st iceberg.StructType, structArr arrow.Array, fieldResults []arrow.Array) arrow.Array {
if structArr == nil {
return nil
}
fieldArrs := make([]arrow.Array, len(st.FieldList))
fields := make([]arrow.Field, len(st.FieldList))
for i, field := range st.FieldList {
arr := fieldResults[i]
if arr != nil {
if _, ok := arr.DataType().(arrow.NestedType); ok {
defer arr.Release()
}
arr = a.castIfNeeded(field, arr)
defer arr.Release()
fieldArrs[i] = arr
fields[i] = a.constructField(field, arr.DataType())
} else if !field.Required {
dt := retOrPanic(TypeToArrowType(field.Type, false, a.useLargeTypes))
arr = array.MakeArrayOfNull(compute.GetAllocator(a.ctx), dt, structArr.Len())
defer arr.Release()
fieldArrs[i] = arr
fields[i] = a.constructField(field, arr.DataType())
} else {
panic(fmt.Errorf("%w: field is required, but could not be found in file: %s",
iceberg.ErrInvalidSchema, field))
}
}
var nullBitmap *memory.Buffer
if structArr.NullN() > 0 {
if structArr.Data().Offset() > 0 {
// the children already accounted for any offset because we used the `Field` method
// on the struct array in the FieldPartner accessor. So we just need to adjust the
// bitmap to account for the offset.
nullBitmap = memory.NewResizableBuffer(compute.GetAllocator(a.ctx))
defer nullBitmap.Release()
nullBitmap.Resize(int(bitutil.BytesForBits(int64(structArr.Len()))))
bitutil.CopyBitmap(structArr.NullBitmapBytes(), structArr.Data().Offset(), structArr.Len(),
nullBitmap.Bytes(), 0)
} else {
nullBitmap = structArr.Data().Buffers()[0]
}
}
return retOrPanic(array.NewStructArrayWithFieldsAndNulls(fieldArrs, fields,
nullBitmap, structArr.NullN(), 0))
}
func (a *arrowProjectionVisitor) Field(_ iceberg.NestedField, _ arrow.Array, fieldArr arrow.Array) arrow.Array {
return fieldArr
}
func (a *arrowProjectionVisitor) List(listType iceberg.ListType, listArr arrow.Array, valArr arrow.Array) arrow.Array {
arr, ok := listArr.(array.ListLike)
if !ok || valArr == nil {
return nil
}
valArr = a.castIfNeeded(listType.ElementField(), valArr)
defer valArr.Release()
var outType arrow.ListLikeType
elemField := a.constructField(listType.ElementField(), valArr.DataType())
switch arr.DataType().ID() {
case arrow.LIST:
outType = arrow.ListOfField(elemField)
case arrow.LARGE_LIST:
outType = arrow.LargeListOfField(elemField)
case arrow.LIST_VIEW:
outType = arrow.LargeListViewOfField(elemField)
}
data := array.NewData(outType, arr.Len(), arr.Data().Buffers(),
[]arrow.ArrayData{valArr.Data()}, arr.NullN(), arr.Data().Offset())
defer data.Release()
return array.MakeFromData(data)
}
func (a *arrowProjectionVisitor) Map(m iceberg.MapType, mapArray, keyResult, valResult arrow.Array) arrow.Array {
if keyResult == nil || valResult == nil {
return nil
}
arr, ok := mapArray.(*array.Map)
if !ok {
return nil
}
keys := a.castIfNeeded(m.KeyField(), keyResult)
defer keys.Release()
vals := a.castIfNeeded(m.ValueField(), valResult)
defer vals.Release()
keyField := a.constructField(m.KeyField(), keys.DataType())
valField := a.constructField(m.ValueField(), vals.DataType())
mapType := arrow.MapOfWithMetadata(keyField.Type, keyField.Metadata, valField.Type, valField.Metadata)
childData := array.NewData(mapType.Elem(), arr.Len(), []*memory.Buffer{nil},
[]arrow.ArrayData{keys.Data(), vals.Data()}, 0, 0)
defer childData.Release()
newData := array.NewData(mapType, arr.Len(), arr.Data().Buffers(),
[]arrow.ArrayData{childData}, arr.NullN(), arr.Offset())
defer newData.Release()
return array.NewMapData(newData)
}
func (a *arrowProjectionVisitor) Primitive(_ iceberg.PrimitiveType, arr arrow.Array) arrow.Array {
return arr
}
// ToRequestedSchema will construct a new record batch matching the requested iceberg schema
// casting columns if necessary as appropriate.
func ToRequestedSchema(ctx context.Context, requested, fileSchema *iceberg.Schema, batch arrow.Record, downcastTimestamp, includeFieldIDs, useLargeTypes bool) (arrow.Record, error) {
st := array.RecordToStructArray(batch)
defer st.Release()
result, err := iceberg.VisitSchemaWithPartner[arrow.Array, arrow.Array](requested, st,
&arrowProjectionVisitor{
ctx: ctx,
fileSchema: fileSchema,
includeFieldIDs: includeFieldIDs,
downcastNsTimestamp: downcastTimestamp,
useLargeTypes: useLargeTypes,
}, arrowAccessor{fileSchema: fileSchema})
if err != nil {
return nil, err
}
st.Release()
out := array.RecordFromStructArray(result.(*array.Struct), nil)
result.Release()
return out, nil
}
type schemaCompatVisitor struct {
provided *iceberg.Schema
errorData pterm.TableData
}
func checkSchemaCompat(requested, provided *iceberg.Schema) error {
sc := &schemaCompatVisitor{
provided: provided,
errorData: pterm.TableData{{"", "Table Field", "Requested Field"}},
}
_, compat := iceberg.PreOrderVisit(requested, sc)
return compat
}
func checkArrowSchemaCompat(requested *iceberg.Schema, provided *arrow.Schema, downcastNanoToMicro bool) error {
mapping := requested.NameMapping()
providedSchema, err := ArrowSchemaToIceberg(provided, downcastNanoToMicro, mapping)
if err != nil {
return err
}
return checkSchemaCompat(requested, providedSchema)
}
func (sc *schemaCompatVisitor) isFieldCompat(lhs iceberg.NestedField) bool {
rhs, ok := sc.provided.FindFieldByID(lhs.ID)
if !ok {
if lhs.Required {
sc.errorData = append(sc.errorData,
[]string{"❌", lhs.String(), "missing"})
return false
}
sc.errorData = append(sc.errorData,
[]string{"✅", lhs.String(), "missing"})
return true
}
if lhs.Required && !rhs.Required {
sc.errorData = append(sc.errorData,
[]string{"❌", lhs.String(), rhs.String()})
return false
}
if lhs.Type.Equals(rhs.Type) {
sc.errorData = append(sc.errorData,
[]string{"✅", lhs.String(), rhs.String()})
return true
}
// we only check that parent node is also of the same type
// we check the type of the child nodes as we traverse them later
switch lhs.Type.(type) {
case *iceberg.StructType:
if rhs, ok := rhs.Type.(*iceberg.StructType); ok {
sc.errorData = append(sc.errorData,
[]string{"✅", lhs.String(), rhs.String()})
return true
}
case *iceberg.ListType:
if rhs, ok := rhs.Type.(*iceberg.ListType); ok {
sc.errorData = append(sc.errorData,
[]string{"✅", lhs.String(), rhs.String()})
return true
}
case *iceberg.MapType:
if rhs, ok := rhs.Type.(*iceberg.MapType); ok {
sc.errorData = append(sc.errorData,
[]string{"✅", lhs.String(), rhs.String()})
return true
}
}
if _, err := iceberg.PromoteType(rhs.Type, lhs.Type); err != nil {
sc.errorData = append(sc.errorData,
[]string{"❌", lhs.String(), rhs.String()})
return false
}
sc.errorData = append(sc.errorData,
[]string{"✅", lhs.String(), rhs.String()})
return true
}
func (sc *schemaCompatVisitor) Schema(s *iceberg.Schema, v func() bool) bool {
if !v() {
pterm.DisableColor()
tbl := pterm.DefaultTable.WithHasHeader(true).WithData(sc.errorData)
tbl.Render()
txt, _ := tbl.Srender()
pterm.EnableColor()
panic("mismatch in fields:\n" + txt)
}
return true
}
func (sc *schemaCompatVisitor) Struct(st iceberg.StructType, v []func() bool) bool {
out := true
for _, res := range v {
out = res() && out
}
return out
}
func (sc *schemaCompatVisitor) Field(n iceberg.NestedField, v func() bool) bool {
return sc.isFieldCompat(n) && v()
}
func (sc *schemaCompatVisitor) List(l iceberg.ListType, v func() bool) bool {
return sc.isFieldCompat(l.ElementField()) && v()
}
func (sc *schemaCompatVisitor) Map(m iceberg.MapType, vk, vv func() bool) bool {
return sc.isFieldCompat(m.KeyField()) && sc.isFieldCompat(m.ValueField()) && vk() && vv()
}
func (sc *schemaCompatVisitor) Primitive(p iceberg.PrimitiveType) bool {
return true
}
func must[T any](v T, err error) T {
if err != nil {
panic(err)
}
return v
}
type arrowStatsCollector struct {
fieldID int
schema *iceberg.Schema
props iceberg.Properties
defaultMode string
}
func (a *arrowStatsCollector) Schema(_ *iceberg.Schema, results func() []tblutils.StatisticsCollector) []tblutils.StatisticsCollector {
return results()
}
func (a *arrowStatsCollector) Struct(_ iceberg.StructType, results []func() []tblutils.StatisticsCollector) []tblutils.StatisticsCollector {
result := make([]tblutils.StatisticsCollector, 0, len(results))
for _, res := range results {
result = append(result, res()...)
}
return result
}
func (a *arrowStatsCollector) Field(field iceberg.NestedField, fieldRes func() []tblutils.StatisticsCollector) []tblutils.StatisticsCollector {
a.fieldID = field.ID
return fieldRes()
}
func (a *arrowStatsCollector) List(list iceberg.ListType, elemResult func() []tblutils.StatisticsCollector) []tblutils.StatisticsCollector {
a.fieldID = list.ElementID
return elemResult()
}
func (a *arrowStatsCollector) Map(m iceberg.MapType, keyResult, valResult func() []tblutils.StatisticsCollector) []tblutils.StatisticsCollector {
a.fieldID = m.KeyID
keyRes := keyResult()
a.fieldID = m.ValueID
valRes := valResult()
return append(keyRes, valRes...)
}
func (a *arrowStatsCollector) Primitive(dt iceberg.PrimitiveType) []tblutils.StatisticsCollector {
colName, ok := a.schema.FindColumnName(a.fieldID)
if !ok {
return []tblutils.StatisticsCollector{}
}
metMode, err := tblutils.MatchMetricsMode(a.defaultMode)
if err != nil {
panic(err)
}
colMode, ok := a.props[MetricsModeColumnConfPrefix+"."+colName]
if ok {
metMode, err = tblutils.MatchMetricsMode(colMode)
if err != nil {
panic(err)
}
}
switch dt.(type) {
case iceberg.StringType:
case iceberg.BinaryType:
default:
if metMode.Typ == tblutils.MetricModeTruncate {
metMode = tblutils.MetricsMode{Typ: tblutils.MetricModeFull, Len: 0}
}
}
isNested := strings.Contains(colName, ".")
if isNested && (metMode.Typ == tblutils.MetricModeTruncate || metMode.Typ == tblutils.MetricModeFull) {
metMode = tblutils.MetricsMode{Typ: tblutils.MetricModeCounts}
}
return []tblutils.StatisticsCollector{{
FieldID: a.fieldID,
IcebergTyp: dt,
ColName: colName,
Mode: metMode,
}}
}
func computeStatsPlan(sc *iceberg.Schema, props iceberg.Properties) (map[int]tblutils.StatisticsCollector, error) {
result := make(map[int]tblutils.StatisticsCollector)
visitor := &arrowStatsCollector{
schema: sc, props: props,
defaultMode: props.Get(DefaultWriteMetricsModeKey, DefaultWriteMetricsModeDefault),
}
collectors, err := iceberg.PreOrderVisit(sc, visitor)
if err != nil {
return nil, err
}
for _, entry := range collectors {
result[entry.FieldID] = entry
}
return result, nil
}
func filesToDataFiles(ctx context.Context, fileIO iceio.IO, meta *MetadataBuilder, paths iter.Seq[string]) iter.Seq2[iceberg.DataFile, error] {
return func(yield func(iceberg.DataFile, error) bool) {
defer func() {
if r := recover(); r != nil {
switch e := r.(type) {
case string:
yield(nil, fmt.Errorf("error encountered during file conversion: %s", e))
case error:
yield(nil, fmt.Errorf("error encountered during file conversion: %w", e))
}
}
}()
currentSchema, currentSpec := meta.CurrentSchema(), meta.CurrentSpec()
for filePath := range paths {
format := tblutils.FormatFromFileName(filePath)
rdr := must(format.Open(ctx, fileIO, filePath))
defer rdr.Close()
arrSchema := must(rdr.Schema())
if hasIDs := must(VisitArrowSchema(arrSchema, hasIDs{})); hasIDs {
yield(nil, fmt.Errorf("%w: cannot add file %s because it has field-ids. add-files only supports the addition of files without field_ids",
iceberg.ErrNotImplemented, filePath))
return
}
if err := checkArrowSchemaCompat(currentSchema, arrSchema, false); err != nil {
yield(nil, err)
return
}
statistics := format.DataFileStatsFromMeta(rdr.Metadata(), must(computeStatsPlan(currentSchema, meta.props)),
must(format.PathToIDMapping(currentSchema)))
df := statistics.ToDataFile(currentSchema, currentSpec, filePath, iceberg.ParquetFile, rdr.SourceFileSize())
if !yield(df, nil) {
return
}
}
}
}
func recordNBytes(rec arrow.Record) (total int64) {
for _, c := range rec.Columns() {
total += int64(c.Data().SizeInBytes())
}
return total
}
func binPackRecords(itr iter.Seq2[arrow.Record, error], recordLookback int, targetFileSize int64) iter.Seq[[]arrow.Record] {
return internal.PackingIterator(func(yield func(arrow.Record) bool) {
for rec, err := range itr {
if err != nil {
panic(err)
}
rec.Retain()
if !yield(rec) {
return
}
}
}, targetFileSize, recordLookback, recordNBytes, false)
}
type recordWritingArgs struct {
sc *arrow.Schema
itr iter.Seq2[arrow.Record, error]
fs iceio.WriteFileIO
writeUUID *uuid.UUID
counter iter.Seq[int]
}
func recordsToDataFiles(ctx context.Context, rootLocation string, meta *MetadataBuilder, args recordWritingArgs) (ret iter.Seq2[iceberg.DataFile, error]) {
if args.counter == nil {
args.counter = internal.Counter(0)
}
defer func() {
if r := recover(); r != nil {
var err error
switch e := r.(type) {
case string:
err = fmt.Errorf("error encountered during file writing %s", e)
case error:
err = fmt.Errorf("error encountered during file writing: %w", e)
}
ret = func(yield func(iceberg.DataFile, error) bool) {
yield(nil, err)
}
}
}()
if args.writeUUID == nil {
u := uuid.Must(uuid.NewRandom())
args.writeUUID = &u
}
targetFileSize := int64(meta.props.GetInt(WriteTargetFileSizeBytesKey,
WriteTargetFileSizeBytesDefault))
nameMapping := meta.CurrentSchema().NameMapping()
taskSchema, err := ArrowSchemaToIceberg(args.sc, false, nameMapping)
if err != nil {
panic(err)
}
nextCount, stopCount := iter.Pull(args.counter)
if meta.CurrentSpec().IsUnpartitioned() {
tasks := func(yield func(WriteTask) bool) {
defer stopCount()
for batch := range binPackRecords(args.itr, 20, targetFileSize) {
cnt, _ := nextCount()
t := WriteTask{
Uuid: *args.writeUUID,
ID: cnt,
Schema: taskSchema,
Batches: batch,
}
if !yield(t) {
return
}
}
}
return writeFiles(ctx, rootLocation, args.fs, meta, tasks)
}
panic(fmt.Errorf("%w: write stream with partitions", iceberg.ErrNotImplemented))
}