in table/scanner.go [328:387]
func (scan *Scan) collectManifestEntries(
ctx context.Context,
manifestList []iceberg.ManifestFile,
) (*manifestEntries, error) {
metricsEval, err := newInclusiveMetricsEvaluator(
scan.metadata.CurrentSchema(),
scan.rowFilter,
scan.caseSensitive,
scan.options["include_empty_files"] == "true",
)
if err != nil {
return nil, err
}
minSeqNum := minSequenceNum(manifestList)
concurrencyLimit := min(scan.concurrency, len(manifestList))
entries := newManifestEntries()
g, _ := errgroup.WithContext(ctx)
g.SetLimit(concurrencyLimit)
partitionEvaluators := newKeyDefaultMap(scan.buildPartitionEvaluator)
for _, mf := range manifestList {
if !scan.checkSequenceNumber(minSeqNum, mf) {
continue
}
g.Go(func() error {
partEval := partitionEvaluators.Get(int(mf.PartitionSpecID()))
manifestEntries, err := openManifest(scan.io, mf, partEval, metricsEval)
if err != nil {
return err
}
for _, e := range manifestEntries {
df := e.DataFile()
switch df.ContentType() {
case iceberg.EntryContentData:
entries.addDataEntry(e)
case iceberg.EntryContentPosDeletes:
entries.addPositionalDeleteEntry(e)
case iceberg.EntryContentEqDeletes:
return errors.New("iceberg-go does not yet support equality deletes")
default:
return fmt.Errorf("%w: unknown DataFileContent type (%s): %s",
ErrInvalidMetadata, df.ContentType(), e)
}
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return entries, nil
}