func()

in table/internal/parquet_files.go [402:518]


func (p parquetFormat) DataFileStatsFromMeta(meta Metadata, statsCols map[int]StatisticsCollector, colMapping map[string]int) *DataFileStatistics {
	pqmeta := meta.(*metadata.FileMetaData)
	var (
		colSizes        = make(map[int]int64)
		valueCounts     = make(map[int]int64)
		splitOffsets    = make([]int64, 0)
		nullValueCounts = make(map[int]int64)
		nanValueCounts  = make(map[int]int64)
		invalidateCol   = make(map[int]struct{})
		colAggs         = make(map[int]StatsAgg)
	)

	for rg := range pqmeta.NumRowGroups() {
		// reference: https://github.com/apache/iceberg-python/blob/main/pyiceberg/io/pyarrow.py#L2285
		rowGroup := pqmeta.RowGroup(rg)
		colChunk, err := rowGroup.ColumnChunk(0)
		if err != nil {
			panic(err)
		}

		dataOffset, dictOffset := colChunk.DataPageOffset(), colChunk.DictionaryPageOffset()
		if colChunk.HasDictionaryPage() && dictOffset < dataOffset {
			splitOffsets = append(splitOffsets, dictOffset)
		} else {
			splitOffsets = append(splitOffsets, dataOffset)
		}

		for pos := range rowGroup.NumColumns() {
			colChunk, err = rowGroup.ColumnChunk(pos)
			if err != nil {
				panic(err)
			}

			fieldID := colMapping[colChunk.PathInSchema().String()]
			statsCol := statsCols[fieldID]
			if statsCol.Mode.Typ == MetricModeNone {
				continue
			}

			colSizes[fieldID] += colChunk.TotalCompressedSize()
			valueCounts[fieldID] += colChunk.NumValues()
			set, err := colChunk.StatsSet()
			if err != nil {
				panic(err)
			}

			if !set {
				invalidateCol[fieldID] = struct{}{}

				continue
			}

			stats, err := colChunk.Statistics()
			if err != nil {
				invalidateCol[fieldID] = struct{}{}

				continue
			}

			if stats.HasNullCount() {
				nullValueCounts[fieldID] += stats.NullCount()
			}

			if statsCol.Mode.Typ == MetricModeCounts || !stats.HasMinMax() {
				continue
			}

			agg, ok := colAggs[fieldID]
			if !ok {
				agg, err = p.createStatsAgg(statsCol.IcebergTyp, stats.Type().String(), statsCol.Mode.Len)
				if err != nil {
					panic(err)
				}

				colAggs[fieldID] = agg
			}

			switch t := statsCol.IcebergTyp.(type) {
			case iceberg.BinaryType:
				stats = &wrappedBinaryStats{stats.(*metadata.ByteArrayStatistics)}
			case iceberg.UUIDType:
				stats = &wrappedUUIDStats{stats.(*metadata.FixedLenByteArrayStatistics)}
			case iceberg.StringType:
				stats = &wrappedStringStats{stats.(*metadata.ByteArrayStatistics)}
			case iceberg.FixedType:
				stats = &wrappedFLBAStats{stats.(*metadata.FixedLenByteArrayStatistics)}
			case iceberg.DecimalType:
				stats = &wrappedDecStats{stats.(*metadata.FixedLenByteArrayStatistics), t.Scale()}
			}

			agg.Update(stats)
		}

	}

	slices.Sort(splitOffsets)
	maps.DeleteFunc(nullValueCounts, func(fieldID int, _ int64) bool {
		_, ok := invalidateCol[fieldID]

		return ok
	})
	maps.DeleteFunc(colAggs, func(fieldID int, _ StatsAgg) bool {
		_, ok := invalidateCol[fieldID]

		return ok
	})

	return &DataFileStatistics{
		RecordCount:     pqmeta.GetNumRows(),
		ColSizes:        colSizes,
		ValueCounts:     valueCounts,
		NullValueCounts: nullValueCounts,
		NanValueCounts:  nanValueCounts,
		SplitOffsets:    splitOffsets,
		ColAggs:         colAggs,
	}
}