in db/rdbms/migration/0002_migrate_descriptor_to_extended_descriptor.go [355:403]
func (m *DescriptorMigration) up(db dbConn) error {
// Count how many entries we have in jobs table that we need to migrate. Split them into
// shards of size shardSize for migration. Can't be done online within a single transaction,
// as there cannot be two active queries on the same connection at the same time
// (see https://github.com/lib/pq/issues/81)
count := uint64(0)
ctx := m.Context
ctx.Debugf("counting the number of jobs to migrate")
start := time.Now()
rows, err := db.Query("select count(*) from jobs")
if err != nil {
return fmt.Errorf("could not fetch number of records to migrate: %w", err)
}
if !rows.Next() {
err := "could not fetch number of records to migrate, at least one result from count(*) expected"
if rows.Err() == nil {
return fmt.Errorf(err)
}
return fmt.Errorf("%s (err: %w)", err, rows.Err())
}
if err := rows.Scan(&count); err != nil {
return fmt.Errorf("could not fetch number of records to migrate: %w", err)
}
if err := rows.Close(); err != nil {
ctx.Warnf("could not close rows after count(*) query")
}
// Create a new plugin registry. This is necessary because some information that need to be
// associated with the extended_descriptor is not available in the db and can only be looked
// up via the TestFetcher.
registry := pluginregistry.NewPluginRegistry(ctx)
initPlugins(registry, ctx.Logger())
elapsed := time.Since(start)
ctx.Debugf("total number of jobs to migrate: %d, fetched in %.3f ms", count, ms(elapsed))
for offset := uint64(0); offset < count; offset += shardSize {
jobs, err := m.fetchJobs(db, shardSize, offset)
if err != nil {
return fmt.Errorf("could not fetch events in range offset %d limit %d: %w", offset, shardSize, err)
}
err = m.migrateJobs(db, jobs, registry)
if err != nil {
return fmt.Errorf("could not migrate events in range offset %d limit %d: %w", offset, shardSize, err)
}
ctx.Infof("migrated %d/%d", offset, count)
}
return nil
}