func()

in db/rdbms/migration/0002_migrate_descriptor_to_extended_descriptor.go [116:174]


func (m *DescriptorMigration) fetchJobs(db dbConn, limit, offset uint64) ([]Request, error) {
	log := m.Context.Logger()

	log.Debugf("fetching shard limit: %d, offset: %d", limit, offset)
	selectStatement := "select job_id, name, requestor, server_id, request_time, descriptor, teststeps from jobs limit ? offset ?"
	log.Debugf("running query: %s", selectStatement)

	start := time.Now()
	rows, err := db.Query(selectStatement, limit, offset)

	elapsed := time.Since(start)
	log.Debugf("select query executed in: %.3f ms", ms(elapsed))

	if err != nil {
		return nil, fmt.Errorf("could not get job request (limit %d, offset %d): %w", limit, offset, err)
	}
	defer func() {
		_ = rows.Close()
	}()

	var jobs []Request

	start = time.Now()
	for rows.Next() {
		job := Request{}

		// `teststeps` column was added relateively late to the `jobs` table, so
		// pre-existing columns have acquired a NULL value. This needs to be taken
		// into account during migration
		testDescriptors := sql.NullString{}

		err := rows.Scan(
			&job.JobID,
			&job.JobName,
			&job.Requestor,
			&job.ServerID,
			&job.RequestTime,
			&job.JobDescriptor,
			&testDescriptors,
		)
		if err != nil {
			return nil, fmt.Errorf("could not scan job request (limit %d, offset %d): %w", limit, offset, err)
		}
		if testDescriptors.Valid {
			job.TestDescriptors = testDescriptors.String
		}
		jobs = append(jobs, job)
	}
	if err := rows.Err(); err != nil {
		return nil, fmt.Errorf("could not scan job request (limit %d, offset %d): %w", limit, offset, err)
	}

	if len(jobs) == 0 {
		return nil, fmt.Errorf("could not find jobs for limit: %d, offset: %d", limit, offset)
	}
	elapsed = time.Since(start)
	log.Debugf("jobs in shard shard limit: %d, offset: %d fetched in: %.3f ms", limit, offset, ms(elapsed))
	return jobs, nil
}