in table/table.go [113:191]
func (t Table) AllManifests() iter.Seq2[iceberg.ManifestFile, error] {
type list = tblutils.Enumerated[[]iceberg.ManifestFile]
g := errgroup.Group{}
n := len(t.metadata.Snapshots())
ch := make(chan list, n)
for i, sn := range t.metadata.Snapshots() {
g.Go(func() error {
manifests, err := sn.Manifests(t.fs)
if err != nil {
return err
}
ch <- list{Index: i, Value: manifests, Last: i == n-1}
return nil
})
}
errch := make(chan error, 1)
go func() {
defer close(errch)
defer close(ch)
if err := g.Wait(); err != nil {
errch <- err
}
}()
results := tblutils.MakeSequencedChan(uint(n), ch,
func(left, right *list) bool {
switch {
case left.Index < 0:
return true
case right.Index < 0:
return false
default:
return left.Index < right.Index
}
}, func(prev, next *list) bool {
if prev.Index < 0 {
return next.Index == 0
}
return next.Index == prev.Index+1
}, list{Index: -1})
return func(yield func(iceberg.ManifestFile, error) bool) {
defer func() {
// drain channels if we exited early
go func() {
for range results {
}
for range errch {
}
}()
}()
for {
select {
case err := <-errch:
if err != nil {
yield(nil, err)
return
}
case next, ok := <-results:
for _, mf := range next.Value {
if !yield(mf, nil) {
return
}
}
if next.Last || !ok {
return
}
}
}
}
}