func()

in table/snapshot_producers.go [506:596]


func (sp *snapshotProducer) manifests() ([]iceberg.ManifestFile, error) {
	var g errgroup.Group

	results := [...][]iceberg.ManifestFile{nil, nil, nil}

	if len(sp.addedFiles) > 0 {
		g.Go(func() error {
			out, path, err := sp.newManifestOutput()
			if err != nil {
				return err
			}
			defer out.Close()

			counter := &internal.CountingWriter{W: out}

			wr, err := iceberg.NewManifestWriter(sp.txn.meta.formatVersion, counter,
				sp.txn.meta.CurrentSpec(), sp.txn.meta.CurrentSchema(),
				sp.snapshotID)
			if err != nil {
				return err
			}

			for _, df := range sp.addedFiles {
				err := wr.Add(iceberg.NewManifestEntry(iceberg.EntryStatusADDED, &sp.snapshotID,
					nil, nil, df))
				if err != nil {
					return err
				}
			}

			mf, err := wr.ToManifestFile(path, counter.Count)
			if err == nil {
				results[0] = append(results[0], mf)
			}

			return err
		})
	}

	deleted, err := sp.deletedEntries()
	if err != nil {
		return nil, err
	}

	if len(deleted) > 0 {
		g.Go(func() error {
			partitionGroups := map[int][]iceberg.ManifestEntry{}
			for _, entry := range deleted {
				specid := int(entry.DataFile().SpecID())

				group := partitionGroups[specid]
				partitionGroups[specid] = append(group, entry)
			}

			for specid, entries := range partitionGroups {
				out, path, err := sp.newManifestOutput()
				if err != nil {
					return err
				}
				defer out.Close()

				mf, err := iceberg.WriteManifest(path, out, sp.txn.meta.formatVersion,
					sp.spec(specid), sp.txn.meta.CurrentSchema(), sp.snapshotID, entries)
				if err != nil {
					return err
				}
				results[1] = append(results[1], mf)
			}

			return nil
		})
	}

	g.Go(func() error {
		m, err := sp.existingManifests()
		if err != nil {
			return err
		}
		results[2] = m

		return nil
	})

	if err := g.Wait(); err != nil {
		return nil, err
	}

	manifests := slices.Concat(results[0], results[1], results[2])

	return sp.processManifests(manifests)
}