func()

in table/table.go [113:191]


func (t Table) AllManifests() iter.Seq2[iceberg.ManifestFile, error] {
	type list = tblutils.Enumerated[[]iceberg.ManifestFile]
	g := errgroup.Group{}

	n := len(t.metadata.Snapshots())
	ch := make(chan list, n)
	for i, sn := range t.metadata.Snapshots() {
		g.Go(func() error {
			manifests, err := sn.Manifests(t.fs)
			if err != nil {
				return err
			}

			ch <- list{Index: i, Value: manifests, Last: i == n-1}

			return nil
		})
	}

	errch := make(chan error, 1)
	go func() {
		defer close(errch)
		defer close(ch)
		if err := g.Wait(); err != nil {
			errch <- err
		}
	}()

	results := tblutils.MakeSequencedChan(uint(n), ch,
		func(left, right *list) bool {
			switch {
			case left.Index < 0:
				return true
			case right.Index < 0:
				return false
			default:
				return left.Index < right.Index
			}
		}, func(prev, next *list) bool {
			if prev.Index < 0 {
				return next.Index == 0
			}

			return next.Index == prev.Index+1
		}, list{Index: -1})

	return func(yield func(iceberg.ManifestFile, error) bool) {
		defer func() {
			// drain channels if we exited early
			go func() {
				for range results {
				}
				for range errch {
				}
			}()
		}()

		for {
			select {
			case err := <-errch:
				if err != nil {
					yield(nil, err)

					return
				}
			case next, ok := <-results:
				for _, mf := range next.Value {
					if !yield(mf, nil) {
						return
					}
				}

				if next.Last || !ok {
					return
				}
			}
		}
	}
}