func()

in series_migrator.go [321:364]


func (m *storageMigrator) migrateStep(ctx context.Context, from, through model.Time, appendable tsdb.Appendable) error {
	g, ctx := errgroup.WithContext(ctx)
	trackers := make(chan *seriesFileTracker)

	g.Go(func() error {
		defer close(trackers)
		for _, tr := range m.trackers {
			if tr.done() {
				continue
			}

			select {
			case trackers <- tr:
			case <-ctx.Done():
				return ctx.Err()
			}
		}
		return nil
	})

	for i := 0; i < m.parallelism; i++ {
		g.Go(func() error {
			for tr := range trackers {
				app := appendable.Appender()
				if err := tr.migrate(from, through, m.v1Path, app); err != nil {
					app.Rollback() // TODO: Log error?
					return err
				}
				if err := app.Commit(); err != nil {
					return fmt.Errorf("error committing samples: %s", err)
				}

				select {
				case <-ctx.Done():
					return ctx.Err()
				default:
				}
			}
			return nil
		})
	}

	return g.Wait()
}