in table/internal/parquet_files.go [402:518]
func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics {
pqmeta := meta.(*metadata.FileMetaData)
var (
colSizes = make(map[int]int64)
valueCounts = make(map[int]int64)
splitOffsets = make([]int64, 0)
nullValueCounts = make(map[int]int64)
nanValueCounts = make(map[int]int64)
invalidateCol = make(map[int]struct{})
colAggs = make(map[int]StatsAgg)
)
for rg := range pqmeta.NumRowGroups() {
// reference: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L2285
rowGroup := pqmeta.RowGroup(rg)
colChunk, err := rowGroup.ColumnChunk(0)
if err != nil {
panic(err)
}
dataOffset, dictOffset := colChunk.DataPageOffset(), colChunk.DictionaryPageOffset()
if colChunk.HasDictionaryPage() && dictOffset < dataOffset {
splitOffsets = append(splitOffsets, dictOffset)
} else {
splitOffsets = append(splitOffsets, dataOffset)
}
for pos := range rowGroup.NumColumns() {
colChunk, err = rowGroup.ColumnChunk(pos)
if err != nil {
panic(err)
}
fieldID := colMapping[colChunk.PathInSchema().String()]
statsCol := statsCols[fieldID]
if statsCol.Mode.Typ == MetricModeNone {
continue
}
colSizes[fieldID] += colChunk.TotalCompressedSize()
valueCounts[fieldID] += colChunk.NumValues()
set, err := colChunk.StatsSet()
if err != nil {
panic(err)
}
if !set {
invalidateCol[fieldID] = struct{}{}
continue
}
stats, err := colChunk.Statistics()
if err != nil {
invalidateCol[fieldID] = struct{}{}
continue
}
if stats.HasNullCount() {
nullValueCounts[fieldID] += stats.NullCount()
}
if statsCol.Mode.Typ == MetricModeCounts || !stats.HasMinMax() {
continue
}
agg, ok := colAggs[fieldID]
if !ok {
agg, err = p.createStatsAgg(statsCol.IcebergTyp, stats.Type().String(), statsCol.Mode.Len)
if err != nil {
panic(err)
}
colAggs[fieldID] = agg
}
switch t := statsCol.IcebergTyp.(type) {
case iceberg.BinaryType:
stats = &wrappedBinaryStats{stats.(*metadata.ByteArrayStatistics)}
case iceberg.UUIDType:
stats = &wrappedUUIDStats{stats.(*metadata.FixedLenByteArrayStatistics)}
case iceberg.StringType:
stats = &wrappedStringStats{stats.(*metadata.ByteArrayStatistics)}
case iceberg.FixedType:
stats = &wrappedFLBAStats{stats.(*metadata.FixedLenByteArrayStatistics)}
case iceberg.DecimalType:
stats = &wrappedDecStats{stats.(*metadata.FixedLenByteArrayStatistics), t.Scale()}
}
agg.Update(stats)
}
}
slices.Sort(splitOffsets)
maps.DeleteFunc(nullValueCounts, func(fieldID int, _ int64) bool {
_, ok := invalidateCol[fieldID]
return ok
})
maps.DeleteFunc(colAggs, func(fieldID int, _ StatsAgg) bool {
_, ok := invalidateCol[fieldID]
return ok
})
return &DataFileStatistics{
RecordCount: pqmeta.GetNumRows(),
ColSizes: colSizes,
ValueCounts: valueCounts,
NullValueCounts: nullValueCounts,
NanValueCounts: nanValueCounts,
SplitOffsets: splitOffsets,
ColAggs: colAggs,
}
}