in pkg/storage/cassandra/store.go [1062:1201]
func (s *Store) GetTaskConfigs(ctx context.Context, id *peloton.JobID,
instanceIDs []uint32, version uint64) (map[uint32]*task.TaskConfig, *models.ConfigAddOn, error) {
taskConfigMap := make(map[uint32]*task.TaskConfig)
var configAddOn *models.ConfigAddOn
var backFill bool
// add default instance ID to read the default config
var dbInstanceIDs []int
for _, instance := range instanceIDs {
dbInstanceIDs = append(dbInstanceIDs, int(instance))
}
dbInstanceIDs = append(dbInstanceIDs, common.DefaultTaskConfigID)
stmt := s.DataStore.NewQuery().Select("*").From(taskConfigV2Table).
Where(
qb.Eq{
"job_id": id.GetValue(),
"version": version,
"instance_id": dbInstanceIDs,
})
allResults, err := s.executeRead(ctx, stmt)
if err != nil {
log.WithField("job_id", id.GetValue()).
WithField("instance_ids", instanceIDs).
WithField("version", version).
WithError(err).
Error("Failed to get task configs")
s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1)
return taskConfigMap, nil, err
}
if len(allResults) == 0 {
// Try to get task configs from legacy task_config table
stmt := s.DataStore.NewQuery().Select("*").From(taskConfigTable).
Where(
qb.Eq{
"job_id": id.GetValue(),
"version": version,
"instance_id": dbInstanceIDs,
})
allResults, err = s.executeRead(ctx, stmt)
if err != nil {
s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1)
return taskConfigMap,
nil,
errors.Wrap(
err,
fmt.Sprintf(
"failed to get task configs for %v", id.GetValue()),
)
}
if len(allResults) == 0 {
return taskConfigMap, nil, nil
}
s.metrics.TaskMetrics.TaskGetConfigLegacy.Inc(1)
backFill = true
}
var defaultConfig *task.TaskConfig
// Read all the overridden task configs and the default task config
for _, value := range allResults {
var record TaskConfigRecord
if err := FillObject(value, &record, reflect.TypeOf(record)); err != nil {
log.WithField("value", value).
WithError(err).
Error("Failed to Fill into TaskRecord")
s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1)
return nil, nil, err
}
taskConfig, err := record.GetTaskConfig()
if err != nil {
return nil, nil, err
}
if record.InstanceID == common.DefaultTaskConfigID {
// get the default config
defaultConfig = taskConfig
continue
}
taskConfigMap[uint32(record.InstanceID)] = taskConfig
// Read config addon from the first result entry. This is because config
// add-on is same for all tasks of a job
if configAddOn != nil {
continue
}
if configAddOn, err = record.GetConfigAddOn(); err != nil {
log.WithField("value", value).
WithError(err).
Error("Failed to Unmarshal system labels")
s.metrics.TaskMetrics.TaskGetConfigsFail.Inc(1)
return nil, nil, err
}
}
// Fill the instances which don't have a overridden config with the default
// config
for _, instance := range instanceIDs {
if _, ok := taskConfigMap[instance]; !ok {
// use the default config for this instance
if defaultConfig == nil {
// we should never be here.
// Either every instance has a override config or we have a
// default config.
s.metrics.TaskMetrics.TaskGetConfigFail.Inc(1)
return nil, nil, yarpcerrors.NotFoundErrorf("unable to read default task config")
}
taskConfigMap[instance] = defaultConfig
}
}
if backFill {
// back fill entry from task_config to task_config_v2
worker := func(i uint32) error {
var cfg *task.TaskConfig
var ok bool
if cfg, ok = taskConfigMap[i]; !ok {
return yarpcerrors.NotFoundErrorf(
"failed to get config for instance %v", id,
)
}
return s.taskConfigV2Ops.Create(
ctx,
id,
int64(i),
cfg,
configAddOn,
nil,
version,
)
}
err := util.RunInParallel(id.GetValue(), instanceIDs, worker)
if err != nil {
log.WithError(err).Info("failed to backfill task_config_v2")
s.metrics.TaskMetrics.TaskConfigBackFillFail.Inc(1)
} else {
s.metrics.TaskMetrics.TaskConfigBackFill.Inc(1)
}
}
s.metrics.TaskMetrics.TaskGetConfigs.Inc(1)
return taskConfigMap, configAddOn, nil
}