func()

in pkg/storage/cassandra/store.go [1062:1201]


func (s *Store) GetTaskConfigs(ctx context.Context, id *peloton.JobID,
	instanceIDs []uint32, version uint64) (map[uint32]*task.TaskConfig, *models.ConfigAddOn, error) {
	taskConfigMap := make(map[uint32]*task.TaskConfig)
	var configAddOn *models.ConfigAddOn
	var backFill bool

	// add default instance ID to read the default config
	var dbInstanceIDs []int
	for _, instance := range instanceIDs {
		dbInstanceIDs = append(dbInstanceIDs, int(instance))
	}
	dbInstanceIDs = append(dbInstanceIDs, common.DefaultTaskConfigID)

	stmt := s.DataStore.NewQuery().Select("*").From(taskConfigV2Table).
		Where(
			qb.Eq{
				"job_id":      id.GetValue(),
				"version":     version,
				"instance_id": dbInstanceIDs,
			})
	allResults, err := s.executeRead(ctx, stmt)
	if err != nil {
		log.WithField("job_id", id.GetValue()).
			WithField("instance_ids", instanceIDs).
			WithField("version", version).
			WithError(err).
			Error("Failed to get task configs")
		s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1)
		return taskConfigMap, nil, err
	}

	if len(allResults) == 0 {
		// Try to get task configs from legacy task_config table
		stmt := s.DataStore.NewQuery().Select("*").From(taskConfigTable).
			Where(
				qb.Eq{
					"job_id":      id.GetValue(),
					"version":     version,
					"instance_id": dbInstanceIDs,
				})
		allResults, err = s.executeRead(ctx, stmt)
		if err != nil {
			s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1)
			return taskConfigMap,
				nil,
				errors.Wrap(
					err,
					fmt.Sprintf(
						"failed to get task configs for %v", id.GetValue()),
				)
		}
		if len(allResults) == 0 {
			return taskConfigMap, nil, nil
		}
		s.metrics.TaskMetrics.TaskGetConfigLegacy.Inc(1)
		backFill = true
	}

	var defaultConfig *task.TaskConfig
	// Read all the overridden task configs and the default task config
	for _, value := range allResults {
		var record TaskConfigRecord
		if err := FillObject(value, &record, reflect.TypeOf(record)); err != nil {
			log.WithField("value", value).
				WithError(err).
				Error("Failed to Fill into TaskRecord")
			s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1)
			return nil, nil, err
		}
		taskConfig, err := record.GetTaskConfig()
		if err != nil {
			return nil, nil, err
		}
		if record.InstanceID == common.DefaultTaskConfigID {
			// get the default config
			defaultConfig = taskConfig
			continue
		}
		taskConfigMap[uint32(record.InstanceID)] = taskConfig
		// Read config addon from the first result entry. This is because config
		// add-on is same for all tasks of a job
		if configAddOn != nil {
			continue
		}
		if configAddOn, err = record.GetConfigAddOn(); err != nil {
			log.WithField("value", value).
				WithError(err).
				Error("Failed to Unmarshal system labels")
			s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1)
			return nil, nil, err
		}
	}

	// Fill the instances which don't have a overridden config with the default
	// config
	for _, instance := range instanceIDs {
		if _, ok := taskConfigMap[instance]; !ok {
			// use the default config for this instance
			if defaultConfig == nil {
				// we should never be here.
				// Either every instance has a override config or we have a
				// default config.
				s.metrics.TaskMetrics.TaskGetConfigFail.Inc(1)
				return nil, nil, yarpcerrors.NotFoundErrorf("unable to read default task config")
			}
			taskConfigMap[instance] = defaultConfig
		}
	}

	if backFill {
		// back fill entry from task_config to task_config_v2
		worker := func(i uint32) error {
			var cfg *task.TaskConfig
			var ok bool
			if cfg, ok = taskConfigMap[i]; !ok {
				return yarpcerrors.NotFoundErrorf(
					"failed to get config for instance %v", id,
				)
			}
			return s.taskConfigV2Ops.Create(
				ctx,
				id,
				int64(i),
				cfg,
				configAddOn,
				nil,
				version,
			)
		}
		err := util.RunInParallel(id.GetValue(), instanceIDs, worker)
		if err != nil {
			log.WithError(err).Info("failed to backfill task_config_v2")
			s.metrics.TaskMetrics.TaskConfigBackFillFail.Inc(1)
		} else {
			s.metrics.TaskMetrics.TaskConfigBackFill.Inc(1)
		}
	}
	s.metrics.TaskMetrics.TaskGetConfigs.Inc(1)
	return taskConfigMap, configAddOn, nil
}