in db/rdbms/migration/0002_migrate_descriptor_to_extended_descriptor.go [176:331]
func (m *DescriptorMigration) migrateJobs(db dbConn, requests []Request, registry *pluginregistry.PluginRegistry) error {
log := m.Context.Logger()
log.Debugf("migrating %d jobs", len(requests))
start := time.Now()
var updates []jobDescriptorPair
for _, request := range requests {
// Merge JobDescriptor and [][]TestStepDescriptor into a single ExtendedDescriptor.
// ExtendedDescriptor contains TestStepsDescriptors, whose tpye is declared as follows:
// type TestStepsDescriptors struct {
// TestName string
// TestSteps []*TestStepDescriptor
// }
//
// TestStepDescriptor is instead defined as follows:
// type TestStepDescriptor struct {
// Name string
// Label string
// Parameters StepParameters
// }
//
// The previous request.TestDescriptors was actually referring to the JSON
// representation of the steps, for every test (without any reference to the
// test name, that would only be part of the global JobDescriptor). This is
// very ambiguous because to rebuild all the information of a test (for example
// upon resume, we need to merge information coming from two different places,
// the steps description in TestDescriptors and the test name in the top level
// JobDescriptor.
//
// So ExtendedDescriptor holds instead to TestStepsDescriptors, which includes
// test name and test step information.
//
// TestDescriptorsfield is removed from request object.
var jobDesc job.Descriptor
if err := json.Unmarshal([]byte(request.JobDescriptor), &jobDesc); err != nil {
return fmt.Errorf("failed to unmarshal job descriptor (%+v): %w", jobDesc, err)
}
if len(request.TestDescriptors) == 0 {
// These TestDescriptors were problably acquired from entries which pre-existed
// in ConTest db before adding the `teststeps` column. Just skip the migration
// of these entries.
log.Debugf("job request with job id %d has null teststeps value, skipping migration", request.JobID)
continue
}
var stepDescs [][]*test.TestStepDescriptor
if err := json.Unmarshal([]byte(request.TestDescriptors), &stepDescs); err != nil {
return fmt.Errorf("failed to unmarshal test step descriptors from request object (%+v): %w", request.TestDescriptors, err)
}
extendedDescriptor := job.ExtendedDescriptor{Descriptor: jobDesc}
if len(stepDescs) != len(jobDesc.TestDescriptors) {
return fmt.Errorf("number of tests described in JobDescriptor does not match steps stored in db")
}
for index, stepDesc := range stepDescs {
newStepsDesc := test.TestStepsDescriptors{}
// TestName is normally part of TestFetcher parameters, but it's responsibility
// of the test fetcher to return the name of the Test from the Fetch signature.
// So, to complete backfill of the data, we initialize directly a TestFetcher
// and let it retrieve the test name.
newStepsDesc.TestSteps = append(newStepsDesc.TestSteps, stepDesc...)
// Look up the original TestDescriptor from JobDescriptor, instantiate
// TestFetcher accordingly and retrieve the name of the test
td := jobDesc.TestDescriptors[index]
tfb, err := registry.NewTestFetcherBundle(m.Context, td)
if err != nil {
return fmt.Errorf("could not instantiate test fetcher for jobID %d based on descriptor %+v: %w", request.JobID, td, err)
}
name, stepDescFetched, err := tfb.TestFetcher.Fetch(m.Context, tfb.FetchParameters)
if err != nil {
return fmt.Errorf("could not retrieve test description from fetcher for jobID %d: %w", request.JobID, err)
}
// Check that the serialization of the steps retrieved by the test fetcher matches the steps
// stored in the DB. If that's not the case, then, just print a warning: the underlying test
/// might have changed.We go ahead anyway assuming assume the test name is still relevant.
stepDescFetchedJSON, err := json.Marshal(stepDescFetched)
if err != nil {
log.Warnf("steps description (`%v`) fetched by test fetcher for job %d cannot be serialized: %v", stepDescFetched, request.JobID, err)
}
stepDescDBJSON, err := json.Marshal(stepDesc)
if err != nil {
log.Warnf("steps description (`%v`) fetched from db for job %d cannot be serialized: %v", stepDesc, request.JobID, err)
}
if string(stepDescDBJSON) != string(stepDescFetchedJSON) {
log.Warnf("steps retrieved by test fetcher and from database do not match (`%v` != `%v`), test description might have changed", string(stepDescDBJSON), string(stepDescFetchedJSON))
}
newStepsDesc.TestName = name
extendedDescriptor.TestStepsDescriptors = append(extendedDescriptor.TestStepsDescriptors, newStepsDesc)
}
// Serialize job.ExtendedDescriptor
extendedDescriptorJSON, err := json.Marshal(extendedDescriptor)
if err != nil {
return fmt.Errorf("could not serialize extended descriptor for jobID %d (%+v): %w", request.JobID, extendedDescriptor, err)
}
updates = append(updates, jobDescriptorPair{jobID: request.JobID, extendedDescriptor: string(extendedDescriptorJSON)})
}
if len(updates) == 0 {
return nil
}
var (
casePlaceholders []string
wherePlaceholders []string
)
for range updates {
casePlaceholders = append(casePlaceholders, "when ? then ?")
wherePlaceholders = append(wherePlaceholders, "?")
}
insertStatement := fmt.Sprintf("update jobs set extended_descriptor = case job_id %s end where job_id in (%s)", strings.Join(casePlaceholders, " "), strings.Join(wherePlaceholders, ","))
log.Debugf("running insert statement with updates: %s, updates: %+v", insertStatement, updates)
insertStart := time.Now()
var args []interface{}
for _, v := range updates {
args = append(args, v.jobID)
args = append(args, v.extendedDescriptor)
}
for _, v := range updates {
args = append(args, v.jobID)
}
_, err := db.Exec(insertStatement, args...)
if err != nil {
var jobIDs []types.JobID
for _, v := range updates {
jobIDs = append(jobIDs, v.jobID)
}
return fmt.Errorf("could not store extended descriptor (%w) for job ids: %v", err, jobIDs)
}
elapsed := time.Since(insertStart)
log.Debugf("insert statement executed in %.3f ms", ms(elapsed))
elapsedStart := time.Since(start)
log.Debugf("completed migrating %d jobs in %.3f ms", len(requests), ms(elapsedStart))
return nil
}