in table/snapshot_producers.go [506:596]
func (sp *snapshotProducer) manifests() ([]iceberg.ManifestFile, error) {
var g errgroup.Group
results := [...][]iceberg.ManifestFile{nil, nil, nil}
if len(sp.addedFiles) > 0 {
g.Go(func() error {
out, path, err := sp.newManifestOutput()
if err != nil {
return err
}
defer out.Close()
counter := &internal.CountingWriter{W: out}
wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter,
sp.txn.meta.CurrentSpec(), sp.txn.meta.CurrentSchema(),
sp.snapshotID)
if err != nil {
return err
}
for _, df := range sp.addedFiles {
err := wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
nil, nil, df))
if err != nil {
return err
}
}
mf, err := wr.ToManifestFile(path, counter.Count)
if err == nil {
results[0] = append(results[0], mf)
}
return err
})
}
deleted, err := sp.deletedEntries()
if err != nil {
return nil, err
}
if len(deleted) > 0 {
g.Go(func() error {
partitionGroups := map[int][]iceberg.ManifestEntry{}
for _, entry := range deleted {
specid := int(entry.DataFile().SpecID())
group := partitionGroups[specid]
partitionGroups[specid] = append(group, entry)
}
for specid, entries := range partitionGroups {
out, path, err := sp.newManifestOutput()
if err != nil {
return err
}
defer out.Close()
mf, err := iceberg.WriteManifest(path, out, sp.txn.meta.formatVersion,
sp.spec(specid), sp.txn.meta.CurrentSchema(), sp.snapshotID, entries)
if err != nil {
return err
}
results[1] = append(results[1], mf)
}
return nil
})
}
g.Go(func() error {
m, err := sp.existingManifests()
if err != nil {
return err
}
results[2] = m
return nil
})
if err := g.Wait(); err != nil {
return nil, err
}
manifests := slices.Concat(results[0], results[1], results[2])
return sp.processManifests(manifests)
}