in series_migrator.go [321:364]
func (m *storageMigrator) migrateStep(ctx context.Context, from, through model.Time, appendable tsdb.Appendable) error {
g, ctx := errgroup.WithContext(ctx)
trackers := make(chan *seriesFileTracker)
g.Go(func() error {
defer close(trackers)
for _, tr := range m.trackers {
if tr.done() {
continue
}
select {
case trackers <- tr:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
})
for i := 0; i < m.parallelism; i++ {
g.Go(func() error {
for tr := range trackers {
app := appendable.Appender()
if err := tr.migrate(from, through, m.v1Path, app); err != nil {
app.Rollback() // TODO: Log error?
return err
}
if err := app.Commit(); err != nil {
return fmt.Errorf("error committing samples: %s", err)
}
select {
case <-ctx.Done():
return ctx.Err()
default:
}
}
return nil
})
}
return g.Wait()
}