func()

in db/rdbms/migration/0002_migrate_descriptor_to_extended_descriptor.go [176:331]


func (m *DescriptorMigration) migrateJobs(db dbConn, requests []Request, registry *pluginregistry.PluginRegistry) error {
	log := m.Context.Logger()

	log.Debugf("migrating %d jobs", len(requests))
	start := time.Now()

	var updates []jobDescriptorPair
	for _, request := range requests {

		// Merge JobDescriptor and [][]TestStepDescriptor into a single ExtendedDescriptor.
		// ExtendedDescriptor contains TestStepsDescriptors, whose tpye is declared as follows:
		// type TestStepsDescriptors struct {
		//		TestName    string
		// 		TestSteps   []*TestStepDescriptor
		// }
		//
		// TestStepDescriptor is instead defined as follows:
		// type TestStepDescriptor struct {
		//		Name       string
		//		Label      string
		//		Parameters StepParameters
		// }
		//
		// The previous request.TestDescriptors was actually referring to the JSON
		// representation of the steps, for every test (without any reference to the
		// test name, that would only be part of the global JobDescriptor). This is
		// very ambiguous because to rebuild all the information of a test (for example
		// upon resume, we need to merge information coming from two different places,
		// the steps description in TestDescriptors and the test name in the top level
		// JobDescriptor.
		//
		// So ExtendedDescriptor holds instead to TestStepsDescriptors, which includes
		// test name and test step information.
		//
		// TestDescriptorsfield is removed from request object.

		var jobDesc job.Descriptor
		if err := json.Unmarshal([]byte(request.JobDescriptor), &jobDesc); err != nil {
			return fmt.Errorf("failed to unmarshal job descriptor (%+v): %w", jobDesc, err)
		}

		if len(request.TestDescriptors) == 0 {
			// These TestDescriptors were problably acquired from entries which pre-existed
			// in ConTest db before adding the `teststeps` column. Just skip the migration
			// of these entries.
			log.Debugf("job request with job id %d has null teststeps value, skipping migration", request.JobID)
			continue
		}

		var stepDescs [][]*test.TestStepDescriptor
		if err := json.Unmarshal([]byte(request.TestDescriptors), &stepDescs); err != nil {
			return fmt.Errorf("failed to unmarshal test step descriptors from request object (%+v): %w", request.TestDescriptors, err)
		}

		extendedDescriptor := job.ExtendedDescriptor{Descriptor: jobDesc}
		if len(stepDescs) != len(jobDesc.TestDescriptors) {
			return fmt.Errorf("number of tests described in JobDescriptor does not match steps stored in db")
		}

		for index, stepDesc := range stepDescs {
			newStepsDesc := test.TestStepsDescriptors{}
			// TestName is normally part of TestFetcher parameters, but it's responsibility
			// of the test fetcher to return the name of the Test from the Fetch signature.
			// So, to complete backfill of the data, we initialize directly a TestFetcher
			// and let it retrieve the test name.
			newStepsDesc.TestSteps = append(newStepsDesc.TestSteps, stepDesc...)

			// Look up the original TestDescriptor from JobDescriptor, instantiate
			// TestFetcher accordingly and retrieve the name of the test
			td := jobDesc.TestDescriptors[index]

			tfb, err := registry.NewTestFetcherBundle(m.Context, td)
			if err != nil {
				return fmt.Errorf("could not instantiate test fetcher for jobID %d based on descriptor %+v: %w", request.JobID, td, err)
			}

			name, stepDescFetched, err := tfb.TestFetcher.Fetch(m.Context, tfb.FetchParameters)
			if err != nil {
				return fmt.Errorf("could not retrieve test description from fetcher for jobID %d: %w", request.JobID, err)
			}

			// Check that the serialization of the steps retrieved by the test fetcher matches the steps
			// stored in the DB. If that's not the case, then, just print a warning: the underlying test
			/// might have changed.We go ahead anyway assuming assume the test name is still relevant.
			stepDescFetchedJSON, err := json.Marshal(stepDescFetched)
			if err != nil {
				log.Warnf("steps description (`%v`) fetched by test fetcher for job %d cannot be serialized: %v", stepDescFetched, request.JobID, err)
			}

			stepDescDBJSON, err := json.Marshal(stepDesc)
			if err != nil {
				log.Warnf("steps description (`%v`) fetched from db for job %d cannot be serialized: %v", stepDesc, request.JobID, err)
			}

			if string(stepDescDBJSON) != string(stepDescFetchedJSON) {
				log.Warnf("steps retrieved by test fetcher and from database do not match (`%v` != `%v`), test description might have changed", string(stepDescDBJSON), string(stepDescFetchedJSON))
			}

			newStepsDesc.TestName = name
			extendedDescriptor.TestStepsDescriptors = append(extendedDescriptor.TestStepsDescriptors, newStepsDesc)
		}

		// Serialize job.ExtendedDescriptor
		extendedDescriptorJSON, err := json.Marshal(extendedDescriptor)
		if err != nil {
			return fmt.Errorf("could not serialize extended descriptor for jobID %d (%+v): %w", request.JobID, extendedDescriptor, err)
		}

		updates = append(updates, jobDescriptorPair{jobID: request.JobID, extendedDescriptor: string(extendedDescriptorJSON)})
	}

	if len(updates) == 0 {
		return nil
	}

	var (
		casePlaceholders  []string
		wherePlaceholders []string
	)

	for range updates {
		casePlaceholders = append(casePlaceholders, "when ? then ?")
		wherePlaceholders = append(wherePlaceholders, "?")
	}
	insertStatement := fmt.Sprintf("update jobs set extended_descriptor = case job_id %s end where job_id in (%s)", strings.Join(casePlaceholders, " "), strings.Join(wherePlaceholders, ","))
	log.Debugf("running insert statement with updates: %s, updates: %+v", insertStatement, updates)

	insertStart := time.Now()
	var args []interface{}

	for _, v := range updates {
		args = append(args, v.jobID)
		args = append(args, v.extendedDescriptor)
	}

	for _, v := range updates {
		args = append(args, v.jobID)
	}

	_, err := db.Exec(insertStatement, args...)
	if err != nil {
		var jobIDs []types.JobID
		for _, v := range updates {
			jobIDs = append(jobIDs, v.jobID)
		}
		return fmt.Errorf("could not store extended descriptor (%w) for job ids: %v", err, jobIDs)
	}

	elapsed := time.Since(insertStart)
	log.Debugf("insert statement executed in %.3f ms", ms(elapsed))

	elapsedStart := time.Since(start)
	log.Debugf("completed migrating %d jobs in %.3f ms", len(requests), ms(elapsedStart))

	return nil
}