table/internal/parquet_files.go (811 lines of code) (raw):
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package internal
import (
"context"
"errors"
"fmt"
"maps"
"slices"
"strconv"
"strings"
"unsafe"
"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/compute"
"github.com/apache/arrow-go/v18/arrow/decimal128"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/apache/arrow-go/v18/parquet"
"github.com/apache/arrow-go/v18/parquet/compress"
"github.com/apache/arrow-go/v18/parquet/file"
"github.com/apache/arrow-go/v18/parquet/metadata"
"github.com/apache/arrow-go/v18/parquet/pqarrow"
"github.com/apache/iceberg-go"
"github.com/apache/iceberg-go/internal"
iceio "github.com/apache/iceberg-go/io"
"github.com/google/uuid"
)
const (
ParquetRowGroupSizeBytesKey = "write.parquet.row-group-size-bytes"
ParquetRowGroupSizeBytesDefault = 128 * 1024 * 1024 // 128 MB
ParquetRowGroupLimitKey = "write.parquet.row-group-limit"
ParquetRowGroupLimitDefault = 1048576
ParquetPageSizeBytesKey = "write.parquet.page-size-bytes"
ParquetPageSizeBytesDefault = 1024 * 1024 // 1 MB
ParquetPageRowLimitKey = "write.parquet.page-row-limit"
ParquetPageRowLimitDefault = 20000
ParquetDictSizeBytesKey = "write.parquet.dict-size-bytes"
ParquetDictSizeBytesDefault = 2 * 1024 * 1024 // 2 MB
ParquetCompressionKey = "write.parquet.compression-codec"
ParquetCompressionDefault = "zstd"
ParquetCompressionLevelKey = "write.parquet.compression-level"
ParquetCompressionLevelDefault = -1
ParquetBloomFilterMaxBytesKey = "write.parquet.bloom-filter-max-bytes"
ParquetBloomFilterMaxBytesDefault = 1024 * 1024
ParquetBloomFilterColumnEnabledKeyPrefix = "write.parquet.bloom-filter-enabled.column"
)
type parquetFormat struct{}
func (parquetFormat) Open(ctx context.Context, fs iceio.IO, path string) (FileReader, error) {
inputfile, err := fs.Open(path)
if err != nil {
return nil, err
}
rdr, err := file.NewParquetReader(inputfile)
if err != nil {
return nil, err
}
alloc := compute.GetAllocator(ctx)
arrRdr, err := pqarrow.NewFileReader(rdr, pqarrow.ArrowReadProperties{}, alloc)
if err != nil {
return nil, err
}
return wrapPqArrowReader{arrRdr}, nil
}
func (parquetFormat) PathToIDMapping(sc *iceberg.Schema) (map[string]int, error) {
result := make(map[string]int)
paths, err := iceberg.PreOrderVisit(sc, &id2ParquetPathVisitor{})
if err != nil {
return nil, err
}
for _, entry := range paths {
result[entry.path] = entry.fieldID
}
return result, nil
}
func (p parquetFormat) createStatsAgg(typ iceberg.PrimitiveType, physicalTypeStr string, truncLen int) (StatsAgg, error) {
expectedPhysical := p.PrimitiveTypeToPhysicalType(typ)
if physicalTypeStr != expectedPhysical {
switch {
case physicalTypeStr == "INT32" && expectedPhysical == "INT64":
case physicalTypeStr == "FLOAT" && expectedPhysical == "DOUBLE":
default:
return nil, fmt.Errorf("unexpected physical type %s for %s, expected %s",
physicalTypeStr, typ, expectedPhysical)
}
}
switch physicalTypeStr {
case "BOOLEAN":
return newStatAgg[bool](typ, truncLen), nil
case "INT32":
switch typ.(type) {
case iceberg.DecimalType:
return &decAsIntAgg[int32]{
newStatAgg[int32](typ, truncLen).(*statsAggregator[int32]),
}, nil
}
return newStatAgg[int32](typ, truncLen), nil
case "INT64":
switch typ.(type) {
case iceberg.DecimalType:
return &decAsIntAgg[int64]{
newStatAgg[int64](typ, truncLen).(*statsAggregator[int64]),
}, nil
}
return newStatAgg[int64](typ, truncLen), nil
case "FLOAT":
return newStatAgg[float32](typ, truncLen), nil
case "DOUBLE":
return newStatAgg[float64](typ, truncLen), nil
case "FIXED_LEN_BYTE_ARRAY":
switch typ.(type) {
case iceberg.UUIDType:
return newStatAgg[uuid.UUID](typ, truncLen), nil
case iceberg.DecimalType:
return newStatAgg[iceberg.Decimal](typ, truncLen), nil
default:
return newStatAgg[[]byte](typ, truncLen), nil
}
case "BYTE_ARRAY":
if typ.Equals(iceberg.PrimitiveTypes.String) {
return newStatAgg[string](typ, truncLen), nil
}
return newStatAgg[[]byte](typ, truncLen), nil
default:
return nil, fmt.Errorf("unsupported physical type: %s", physicalTypeStr)
}
}
func (parquetFormat) PrimitiveTypeToPhysicalType(typ iceberg.PrimitiveType) string {
switch typ.(type) {
case iceberg.BooleanType:
return "BOOLEAN"
case iceberg.Int32Type:
return "INT32"
case iceberg.Int64Type:
return "INT64"
case iceberg.Float32Type:
return "FLOAT"
case iceberg.Float64Type:
return "DOUBLE"
case iceberg.DateType:
return "INT32"
case iceberg.TimeType:
return "INT64"
case iceberg.TimestampType:
return "INT64"
case iceberg.TimestampTzType:
return "INT64"
case iceberg.StringType:
return "BYTE_ARRAY"
case iceberg.UUIDType:
return "FIXED_LEN_BYTE_ARRAY"
case iceberg.FixedType:
return "FIXED_LEN_BYTE_ARRAY"
case iceberg.BinaryType:
return "BYTE_ARRAY"
case iceberg.DecimalType:
return "FIXED_LEN_BYTE_ARRAY"
default:
panic(fmt.Errorf("expected primitive type, got: %s", typ))
}
}
func (parquetFormat) GetWriteProperties(props iceberg.Properties) any {
writerProps := []parquet.WriterProperty{
parquet.WithDictionaryDefault(false),
parquet.WithMaxRowGroupLength(int64(props.GetInt(ParquetRowGroupLimitKey,
ParquetRowGroupLimitDefault))),
parquet.WithDataPageSize(int64(props.GetInt(ParquetPageSizeBytesKey,
ParquetPageSizeBytesDefault))),
parquet.WithDataPageVersion(parquet.DataPageV2),
parquet.WithBatchSize(int64(props.GetInt(ParquetPageRowLimitKey,
ParquetPageRowLimitDefault))),
parquet.WithDictionaryPageSizeLimit(int64(props.GetInt(ParquetDictSizeBytesKey,
ParquetDictSizeBytesDefault))),
}
compression := props.Get(ParquetCompressionKey, ParquetCompressionDefault)
compressionLevel := props.GetInt(ParquetCompressionLevelKey,
ParquetCompressionLevelDefault)
var codec compress.Compression
switch compression {
case "snappy":
codec = compress.Codecs.Snappy
case "zstd":
codec = compress.Codecs.Zstd
case "uncompressed":
codec = compress.Codecs.Uncompressed
case "gzip":
codec = compress.Codecs.Gzip
case "brotli":
codec = compress.Codecs.Brotli
case "lz4":
codec = compress.Codecs.Lz4
case "lz4raw":
codec = compress.Codecs.Lz4Raw
case "lzo":
codec = compress.Codecs.Lzo
default:
// warn
}
return append(writerProps, parquet.WithCompression(codec),
parquet.WithCompressionLevel(compressionLevel))
}
func (p parquetFormat) WriteDataFile(ctx context.Context, fs iceio.WriteFileIO, info WriteFileInfo, batches []arrow.Record) (iceberg.DataFile, error) {
fw, err := fs.Create(info.FileName)
if err != nil {
return nil, err
}
defer fw.Close()
cntWriter := internal.CountingWriter{W: fw}
mem := compute.GetAllocator(ctx)
writerProps := parquet.NewWriterProperties(info.WriteProps.([]parquet.WriterProperty)...)
arrProps := pqarrow.NewArrowWriterProperties(pqarrow.WithAllocator(mem), pqarrow.WithStoreSchema())
writer, err := pqarrow.NewFileWriter(batches[0].Schema(), &cntWriter, writerProps, arrProps)
if err != nil {
return nil, err
}
for _, batch := range batches {
if err := writer.WriteBuffered(batch); err != nil {
return nil, err
}
}
if err := writer.Close(); err != nil {
return nil, err
}
filemeta, err := writer.FileMetadata()
if err != nil {
return nil, err
}
colMapping, err := p.PathToIDMapping(info.FileSchema)
if err != nil {
return nil, err
}
return p.DataFileStatsFromMeta(filemeta, info.StatsCols, colMapping).
ToDataFile(info.FileSchema, info.Spec, info.FileName, iceberg.ParquetFile, cntWriter.Count), nil
}
type decAsIntAgg[T int32 | int64] struct {
*statsAggregator[T]
}
func (s *decAsIntAgg[T]) MinAsBytes() ([]byte, error) {
if s.curMin == nil {
return nil, nil
}
lit := iceberg.DecimalLiteral(iceberg.Decimal{
Val: decimal128.FromI64(int64(s.curMin.Value())),
Scale: s.primitiveType.(iceberg.DecimalType).Scale(),
})
if s.truncLen > 0 {
return s.toBytes((&iceberg.TruncateTransform{Width: s.truncLen}).
Apply(iceberg.Optional[iceberg.Literal]{Valid: true, Val: lit}).Val)
}
return s.toBytes(lit)
}
func (s *decAsIntAgg[T]) MaxAsBytes() ([]byte, error) {
if s.curMax == nil {
return nil, nil
}
lit := iceberg.DecimalLiteral(iceberg.Decimal{
Val: decimal128.FromI64(int64(s.curMax.Value())),
Scale: s.primitiveType.(iceberg.DecimalType).Scale(),
})
if s.truncLen <= 0 {
return s.toBytes(lit)
}
return nil, fmt.Errorf("%s cannot be truncated for upper bound", s.primitiveType)
}
type wrappedBinaryStats struct {
*metadata.ByteArrayStatistics
}
func (w *wrappedBinaryStats) Min() []byte {
return w.ByteArrayStatistics.Min()
}
func (w *wrappedBinaryStats) Max() []byte {
return w.ByteArrayStatistics.Max()
}
type wrappedStringStats struct {
*metadata.ByteArrayStatistics
}
func (w *wrappedStringStats) Min() string {
data := w.ByteArrayStatistics.Min()
return unsafe.String(unsafe.SliceData(data), len(data))
}
func (w *wrappedStringStats) Max() string {
data := w.ByteArrayStatistics.Max()
return unsafe.String(unsafe.SliceData(data), len(data))
}
type wrappedUUIDStats struct {
*metadata.FixedLenByteArrayStatistics
}
func (w *wrappedUUIDStats) Min() uuid.UUID {
uid, err := uuid.FromBytes(w.FixedLenByteArrayStatistics.Min())
if err != nil {
panic(err)
}
return uid
}
func (w *wrappedUUIDStats) Max() uuid.UUID {
uid, err := uuid.FromBytes(w.FixedLenByteArrayStatistics.Max())
if err != nil {
panic(err)
}
return uid
}
type wrappedFLBAStats struct {
*metadata.FixedLenByteArrayStatistics
}
func (w *wrappedFLBAStats) Min() []byte {
return w.FixedLenByteArrayStatistics.Min()
}
func (w *wrappedFLBAStats) Max() []byte {
return w.FixedLenByteArrayStatistics.Max()
}
type wrappedDecStats struct {
*metadata.FixedLenByteArrayStatistics
scale int
}
func (w wrappedDecStats) Min() iceberg.Decimal {
dec, err := BigEndianToDecimal(w.FixedLenByteArrayStatistics.Min())
if err != nil {
panic(err)
}
return iceberg.Decimal{Val: dec, Scale: w.scale}
}
func (w wrappedDecStats) Max() iceberg.Decimal {
dec, err := BigEndianToDecimal(w.FixedLenByteArrayStatistics.Max())
if err != nil {
panic(err)
}
return iceberg.Decimal{Val: dec, Scale: w.scale}
}
func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics {
pqmeta := meta.(*metadata.FileMetaData)
var (
colSizes = make(map[int]int64)
valueCounts = make(map[int]int64)
splitOffsets = make([]int64, 0)
nullValueCounts = make(map[int]int64)
nanValueCounts = make(map[int]int64)
invalidateCol = make(map[int]struct{})
colAggs = make(map[int]StatsAgg)
)
for rg := range pqmeta.NumRowGroups() {
// reference: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L2285
rowGroup := pqmeta.RowGroup(rg)
colChunk, err := rowGroup.ColumnChunk(0)
if err != nil {
panic(err)
}
dataOffset, dictOffset := colChunk.DataPageOffset(), colChunk.DictionaryPageOffset()
if colChunk.HasDictionaryPage() && dictOffset < dataOffset {
splitOffsets = append(splitOffsets, dictOffset)
} else {
splitOffsets = append(splitOffsets, dataOffset)
}
for pos := range rowGroup.NumColumns() {
colChunk, err = rowGroup.ColumnChunk(pos)
if err != nil {
panic(err)
}
fieldID := colMapping[colChunk.PathInSchema().String()]
statsCol := statsCols[fieldID]
if statsCol.Mode.Typ == MetricModeNone {
continue
}
colSizes[fieldID] += colChunk.TotalCompressedSize()
valueCounts[fieldID] += colChunk.NumValues()
set, err := colChunk.StatsSet()
if err != nil {
panic(err)
}
if !set {
invalidateCol[fieldID] = struct{}{}
continue
}
stats, err := colChunk.Statistics()
if err != nil {
invalidateCol[fieldID] = struct{}{}
continue
}
if stats.HasNullCount() {
nullValueCounts[fieldID] += stats.NullCount()
}
if statsCol.Mode.Typ == MetricModeCounts || !stats.HasMinMax() {
continue
}
agg, ok := colAggs[fieldID]
if !ok {
agg, err = p.createStatsAgg(statsCol.IcebergTyp, stats.Type().String(), statsCol.Mode.Len)
if err != nil {
panic(err)
}
colAggs[fieldID] = agg
}
switch t := statsCol.IcebergTyp.(type) {
case iceberg.BinaryType:
stats = &wrappedBinaryStats{stats.(*metadata.ByteArrayStatistics)}
case iceberg.UUIDType:
stats = &wrappedUUIDStats{stats.(*metadata.FixedLenByteArrayStatistics)}
case iceberg.StringType:
stats = &wrappedStringStats{stats.(*metadata.ByteArrayStatistics)}
case iceberg.FixedType:
stats = &wrappedFLBAStats{stats.(*metadata.FixedLenByteArrayStatistics)}
case iceberg.DecimalType:
stats = &wrappedDecStats{stats.(*metadata.FixedLenByteArrayStatistics), t.Scale()}
}
agg.Update(stats)
}
}
slices.Sort(splitOffsets)
maps.DeleteFunc(nullValueCounts, func(fieldID int, _ int64) bool {
_, ok := invalidateCol[fieldID]
return ok
})
maps.DeleteFunc(colAggs, func(fieldID int, _ StatsAgg) bool {
_, ok := invalidateCol[fieldID]
return ok
})
return &DataFileStatistics{
RecordCount: pqmeta.GetNumRows(),
ColSizes: colSizes,
ValueCounts: valueCounts,
NullValueCounts: nullValueCounts,
NanValueCounts: nanValueCounts,
SplitOffsets: splitOffsets,
ColAggs: colAggs,
}
}
type ParquetFileSource struct {
mem memory.Allocator
fs iceio.IO
file iceberg.DataFile
}
type wrapPqArrowReader struct {
*pqarrow.FileReader
}
func (w wrapPqArrowReader) Metadata() Metadata {
return w.ParquetReader().MetaData()
}
func (w wrapPqArrowReader) SourceFileSize() int64 {
return w.ParquetReader().MetaData().GetSourceFileSize()
}
func (w wrapPqArrowReader) Close() error {
return w.ParquetReader().Close()
}
func (w wrapPqArrowReader) PrunedSchema(projectedIDs map[int]struct{}, mapping iceberg.NameMapping) (*arrow.Schema, []int, error) {
return pruneParquetColumns(w.Manifest, projectedIDs, false, mapping)
}
func (w wrapPqArrowReader) GetRecords(ctx context.Context, cols []int, tester any) (array.RecordReader, error) {
var (
testRg func(*metadata.RowGroupMetaData, []int) (bool, error)
ok bool
)
if tester != nil {
testRg, ok = tester.(func(*metadata.RowGroupMetaData, []int) (bool, error))
if !ok {
return nil, fmt.Errorf("%w: invalid tester function", iceberg.ErrInvalidArgument)
}
}
var rgList []int
if testRg != nil {
rgList = make([]int, 0)
fileMeta, numRg := w.ParquetReader().MetaData(), w.ParquetReader().NumRowGroups()
for rg := 0; rg < numRg; rg++ {
rgMeta := fileMeta.RowGroup(rg)
use, err := testRg(rgMeta, cols)
if err != nil {
return nil, err
}
if use {
rgList = append(rgList, rg)
}
}
}
return w.GetRecordReader(ctx, cols, rgList)
}
func (pfs *ParquetFileSource) GetReader(ctx context.Context) (FileReader, error) {
pf, err := pfs.fs.Open(pfs.file.FilePath())
if err != nil {
return nil, err
}
rdr, err := file.NewParquetReader(pf,
file.WithReadProps(parquet.NewReaderProperties(pfs.mem)))
if err != nil {
return nil, err
}
// TODO: grab these from the context
arrProps := pqarrow.ArrowReadProperties{
Parallel: true,
BatchSize: 1 << 17,
}
if pfs.file.ContentType() == iceberg.EntryContentPosDeletes {
// for dictionary for filepath col
arrProps.SetReadDict(0, true)
}
fr, err := pqarrow.NewFileReader(rdr, arrProps, pfs.mem)
if err != nil {
return nil, err
}
return wrapPqArrowReader{fr}, nil
}
type manifestVisitor[T any] interface {
Manifest(*pqarrow.SchemaManifest, []T, *iceberg.MappedField) T
Field(pqarrow.SchemaField, T, *iceberg.MappedField) T
Struct(pqarrow.SchemaField, []T, *iceberg.MappedField) T
List(pqarrow.SchemaField, T, *iceberg.MappedField) T
Map(pqarrow.SchemaField, T, T, *iceberg.MappedField) T
Primitive(pqarrow.SchemaField, *iceberg.MappedField) T
}
func visitParquetManifest[T any](manifest *pqarrow.SchemaManifest, visitor manifestVisitor[T], mapping *iceberg.MappedField) (res T, err error) {
if manifest == nil {
err = fmt.Errorf("%w: cannot visit nil manifest", iceberg.ErrInvalidArgument)
return
}
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%s", r)
}
}()
var fieldMap *iceberg.MappedField
results := make([]T, len(manifest.Fields))
for i, f := range manifest.Fields {
if mapping != nil {
fieldMap = mapping.GetField(f.Field.Name)
}
res := visitManifestField(f, visitor, fieldMap)
results[i] = visitor.Field(f, res, fieldMap)
}
return visitor.Manifest(manifest, results, mapping), nil
}
func visitParquetManifestStruct[T any](field pqarrow.SchemaField, visitor manifestVisitor[T], mapping *iceberg.MappedField) T {
results := make([]T, len(field.Children))
var fieldMap *iceberg.MappedField
for i, f := range field.Children {
if mapping != nil {
fieldMap = mapping.GetField(f.Field.Name)
}
res := visitManifestField(f, visitor, fieldMap)
results[i] = visitor.Field(f, res, fieldMap)
}
return visitor.Struct(field, results, mapping)
}
func visitManifestList[T any](field pqarrow.SchemaField, visitor manifestVisitor[T], mapping *iceberg.MappedField) T {
elemField := field.Children[0]
var elemMapping *iceberg.MappedField
if mapping != nil {
elemMapping = mapping.GetField("element")
}
res := visitManifestField(elemField, visitor, elemMapping)
return visitor.List(field, res, mapping)
}
func visitManifestMap[T any](field pqarrow.SchemaField, visitor manifestVisitor[T], mapping *iceberg.MappedField) T {
kvfield := field.Children[0]
keyField, valField := kvfield.Children[0], kvfield.Children[1]
var keyMapping, valMapping *iceberg.MappedField
if mapping != nil {
keyMapping = mapping.GetField("key")
valMapping = mapping.GetField("value")
}
return visitor.Map(field, visitManifestField(keyField, visitor, keyMapping), visitManifestField(valField, visitor, valMapping), mapping)
}
func visitManifestField[T any](field pqarrow.SchemaField, visitor manifestVisitor[T], mapping *iceberg.MappedField) T {
switch field.Field.Type.(type) {
case *arrow.StructType:
return visitParquetManifestStruct(field, visitor, mapping)
case *arrow.MapType:
return visitManifestMap(field, visitor, mapping)
case arrow.ListLikeType:
return visitManifestList(field, visitor, mapping)
default:
return visitor.Primitive(field, mapping)
}
}
func pruneParquetColumns(manifest *pqarrow.SchemaManifest, selected map[int]struct{}, selectFullTypes bool, mapping iceberg.NameMapping) (*arrow.Schema, []int, error) {
visitor := &pruneParquetSchema{
selected: selected,
manifest: manifest,
fullTypes: selectFullTypes,
indices: []int{},
}
result, err := visitParquetManifest(manifest, visitor, &iceberg.MappedField{Fields: mapping})
if err != nil {
return nil, nil, err
}
return arrow.NewSchema(result.Type.(*arrow.StructType).Fields(), &result.Metadata),
visitor.indices, nil
}
func getFieldID(f arrow.Field) *int {
if !f.HasMetadata() {
return nil
}
fieldIDStr, ok := f.Metadata.GetValue("PARQUET:field_id")
if !ok {
return nil
}
id, err := strconv.Atoi(fieldIDStr)
if err != nil {
return nil
}
return &id
}
type pruneParquetSchema struct {
selected map[int]struct{}
fullTypes bool
manifest *pqarrow.SchemaManifest
indices []int
}
func (p *pruneParquetSchema) fieldID(field arrow.Field, mapping *iceberg.MappedField) int {
if mapping != nil {
if mapping.FieldID != nil {
return *mapping.FieldID
}
}
if id := getFieldID(field); id != nil {
return *id
}
panic(fmt.Errorf("%w: cannot convert %s to Iceberg field, missing field_id",
iceberg.ErrInvalidSchema, field))
}
func (p *pruneParquetSchema) Manifest(manifest *pqarrow.SchemaManifest, fields []arrow.Field, _ *iceberg.MappedField) arrow.Field {
finalFields := slices.DeleteFunc(fields, func(f arrow.Field) bool { return f.Type == nil })
result := arrow.Field{
Type: arrow.StructOf(finalFields...),
}
if manifest.SchemaMeta != nil {
result.Metadata = *manifest.SchemaMeta
}
return result
}
func (p *pruneParquetSchema) Struct(field pqarrow.SchemaField, children []arrow.Field, _ *iceberg.MappedField) arrow.Field {
selected, fields := []arrow.Field{}, field.Children
sameType := true
for i, t := range children {
field := fields[i]
if arrow.TypeEqual(field.Field.Type, t.Type) {
selected = append(selected, *field.Field)
} else if t.Type == nil {
sameType = false
// type has changed, create a new field with the projected type
selected = append(selected, arrow.Field{
Name: field.Field.Name,
Type: field.Field.Type,
Nullable: field.Field.Nullable,
Metadata: field.Field.Metadata,
})
}
}
if len(selected) > 0 {
if len(selected) == len(fields) && sameType {
// nothing changed, return the original
return *field.Field
} else {
result := *field.Field
result.Type = arrow.StructOf(selected...)
return result
}
}
return arrow.Field{}
}
func (p *pruneParquetSchema) Field(field pqarrow.SchemaField, result arrow.Field, mapping *iceberg.MappedField) arrow.Field {
_, ok := p.selected[p.fieldID(*field.Field, mapping)]
if !ok {
if result.Type != nil {
return result
}
return arrow.Field{}
}
if p.fullTypes {
return *field.Field
}
if _, ok := field.Field.Type.(*arrow.StructType); ok {
result := *field.Field
result.Type = p.projectSelectedStruct(result.Type)
return result
}
if !field.IsLeaf() {
panic(errors.New("cannot explicitly project list or map types"))
}
p.indices = append(p.indices, field.ColIndex)
return *field.Field
}
func (p *pruneParquetSchema) List(field pqarrow.SchemaField, elemResult arrow.Field, mapping *iceberg.MappedField) arrow.Field {
var elemMapping *iceberg.MappedField
if mapping != nil {
elemMapping = mapping.GetField("element")
}
_, ok := p.selected[p.fieldID(*field.Children[0].Field, elemMapping)]
if !ok {
if elemResult.Type != nil {
result := *field.Field
result.Type = p.projectList(field.Field.Type.(arrow.ListLikeType), elemResult.Type)
return result
}
return arrow.Field{}
}
if p.fullTypes {
return *field.Field
}
_, ok = field.Children[0].Field.Type.(*arrow.StructType)
if field.Children[0].Field.Type != nil && ok {
result := *field.Field
projected := p.projectSelectedStruct(elemResult.Type)
result.Type = p.projectList(field.Field.Type.(arrow.ListLikeType), projected)
return result
}
if !field.Children[0].IsLeaf() {
panic(errors.New("cannot explicitly project list or map types"))
}
p.indices = append(p.indices, field.Children[0].ColIndex)
return *field.Field
}
func (p *pruneParquetSchema) Map(field pqarrow.SchemaField, keyResult, valResult arrow.Field, mapping *iceberg.MappedField) arrow.Field {
var valMapping *iceberg.MappedField
if mapping != nil {
valMapping = mapping.GetField("value")
}
_, ok := p.selected[p.fieldID(*field.Children[0].Children[1].Field, valMapping)]
if !ok {
if valResult.Type != nil {
result := *field.Field
result.Type = p.projectMap(field.Field.Type.(*arrow.MapType), valResult.Type)
return result
}
return arrow.Field{}
}
if p.fullTypes {
return *field.Field
}
_, ok = field.Children[0].Children[1].Field.Type.(*arrow.StructType)
if ok {
result := *field.Field
projected := p.projectSelectedStruct(valResult.Type)
result.Type = p.projectMap(field.Field.Type.(*arrow.MapType), projected)
return result
}
if !field.Children[0].Children[1].IsLeaf() {
panic("cannot explicitly project list or map types")
}
p.indices = append(p.indices, field.Children[0].Children[0].ColIndex)
p.indices = append(p.indices, field.Children[0].Children[1].ColIndex)
return *field.Field
}
func (p *pruneParquetSchema) Primitive(_ pqarrow.SchemaField, _ *iceberg.MappedField) arrow.Field {
return arrow.Field{}
}
func (p *pruneParquetSchema) projectSelectedStruct(projected arrow.DataType) *arrow.StructType {
if projected == nil {
return &arrow.StructType{}
}
if ty, ok := projected.(*arrow.StructType); ok {
return ty
}
panic("expected a struct")
}
func (p *pruneParquetSchema) projectList(listType arrow.ListLikeType, elemResult arrow.DataType) arrow.ListLikeType {
if arrow.TypeEqual(listType.Elem(), elemResult) {
return listType
}
origField := listType.ElemField()
origField.Type = elemResult
switch listType.(type) {
case *arrow.ListType:
return arrow.ListOfField(origField)
case *arrow.LargeListType:
return arrow.LargeListOfField(origField)
case *arrow.ListViewType:
return arrow.ListViewOfField(origField)
}
n := listType.(*arrow.FixedSizeListType).Len()
return arrow.FixedSizeListOfField(n, origField)
}
func (p *pruneParquetSchema) projectMap(m *arrow.MapType, valResult arrow.DataType) *arrow.MapType {
if arrow.TypeEqual(m.ItemType(), valResult) {
return m
}
return arrow.MapOf(m.KeyType(), valResult)
}
type id2ParquetPath struct {
fieldID int
path string
}
type id2ParquetPathVisitor struct {
fieldID int
path []string
}
func (v *id2ParquetPathVisitor) Schema(_ *iceberg.Schema, res func() []id2ParquetPath) []id2ParquetPath {
return res()
}
func (v *id2ParquetPathVisitor) Struct(_ iceberg.StructType, results []func() []id2ParquetPath) []id2ParquetPath {
result := make([]id2ParquetPath, 0, len(results))
for _, res := range results {
result = append(result, res()...)
}
return result
}
func (v *id2ParquetPathVisitor) Field(field iceberg.NestedField, res func() []id2ParquetPath) []id2ParquetPath {
v.fieldID = field.ID
v.path = append(v.path, field.Name)
result := res()
v.path = v.path[:len(v.path)-1]
return result
}
func (v *id2ParquetPathVisitor) List(listType iceberg.ListType, elemResult func() []id2ParquetPath) []id2ParquetPath {
v.fieldID = listType.ElementID
v.path = append(v.path, "list")
result := elemResult()
v.path = v.path[:len(v.path)-1]
return result
}
func (v *id2ParquetPathVisitor) Map(m iceberg.MapType, keyResult func() []id2ParquetPath, valResult func() []id2ParquetPath) []id2ParquetPath {
v.fieldID = m.KeyID
v.path = append(v.path, "key_value")
keyRes := keyResult()
v.path = v.path[:len(v.path)-1]
v.fieldID = m.ValueID
v.path = append(v.path, "key_value")
valRes := valResult()
v.path = v.path[:len(v.path)-1]
return append(keyRes, valRes...)
}
func (v *id2ParquetPathVisitor) Primitive(iceberg.PrimitiveType) []id2ParquetPath {
return []id2ParquetPath{{fieldID: v.fieldID, path: strings.Join(v.path, ".")}}
}