in db/rdbms/migration/0002_migrate_descriptor_to_extended_descriptor.go [116:174]
func (m *DescriptorMigration) fetchJobs(db dbConn, limit, offset uint64) ([]Request, error) {
log := m.Context.Logger()
log.Debugf("fetching shard limit: %d, offset: %d", limit, offset)
selectStatement := "select job_id, name, requestor, server_id, request_time, descriptor, teststeps from jobs limit ? offset ?"
log.Debugf("running query: %s", selectStatement)
start := time.Now()
rows, err := db.Query(selectStatement, limit, offset)
elapsed := time.Since(start)
log.Debugf("select query executed in: %.3f ms", ms(elapsed))
if err != nil {
return nil, fmt.Errorf("could not get job request (limit %d, offset %d): %w", limit, offset, err)
}
defer func() {
_ = rows.Close()
}()
var jobs []Request
start = time.Now()
for rows.Next() {
job := Request{}
// `teststeps` column was added relateively late to the `jobs` table, so
// pre-existing columns have acquired a NULL value. This needs to be taken
// into account during migration
testDescriptors := sql.NullString{}
err := rows.Scan(
&job.JobID,
&job.JobName,
&job.Requestor,
&job.ServerID,
&job.RequestTime,
&job.JobDescriptor,
&testDescriptors,
)
if err != nil {
return nil, fmt.Errorf("could not scan job request (limit %d, offset %d): %w", limit, offset, err)
}
if testDescriptors.Valid {
job.TestDescriptors = testDescriptors.String
}
jobs = append(jobs, job)
}
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("could not scan job request (limit %d, offset %d): %w", limit, offset, err)
}
if len(jobs) == 0 {
return nil, fmt.Errorf("could not find jobs for limit: %d, offset: %d", limit, offset)
}
elapsed = time.Since(start)
log.Debugf("jobs in shard shard limit: %d, offset: %d fetched in: %.3f ms", limit, offset, ms(elapsed))
return jobs, nil
}