in pkg/cli/update_actions.go [57:179]
func (c *Client) UpdateCreateAction(
jobID string,
cfg string,
batchSize uint32,
respoolPath string,
configVersion uint64,
override bool,
maxInstanceAttempts uint32,
maxFailureInstances uint32,
updateRollbackOnFailure bool,
updateStartInPausedState bool,
opaqueData string,
inPlace bool) error {
var jobConfig job.JobConfig
var response *updatesvc.CreateUpdateResponse
// read the job configuration
buffer, err := ioutil.ReadFile(cfg)
if err != nil {
return fmt.Errorf("unable to open file %s: %v", cfg, err)
}
if err := yaml.Unmarshal(buffer, &jobConfig); err != nil {
return fmt.Errorf("unable to parse file %s: %v", cfg, err)
}
// fetch the resource pool id
respoolID, err := c.LookupResourcePoolID(respoolPath)
if err != nil {
return err
}
if respoolID == nil {
return fmt.Errorf("unable to find resource pool ID for "+
":%s", respoolPath)
}
// set the resource pool id
jobConfig.RespoolID = respoolID
for {
// first fetch the job runtime
var jobGetRequest = &job.GetRequest{
Id: &peloton.JobID{
Value: jobID,
},
}
jobGetResponse, err := c.jobClient.Get(c.ctx, jobGetRequest)
if err != nil {
return err
}
// check if there is another update going on
jobRuntime := jobGetResponse.GetJobInfo().GetRuntime()
if jobRuntime == nil {
return fmt.Errorf("unable to find the job to update")
}
if configVersion > 0 {
if jobRuntime.GetConfigurationVersion() != configVersion {
return fmt.Errorf(
"invalid input configuration version current %v provided %v",
jobRuntime.GetConfigurationVersion(), configVersion)
}
}
if jobRuntime.GetUpdateID() != nil &&
len(jobRuntime.GetUpdateID().GetValue()) > 0 {
terminal, err := c.isUpdateTerminated(jobRuntime.GetUpdateID())
if err != nil {
return err
}
if !terminal {
if override {
fmt.Fprintf(tabWriter, "going to override existing update: %v\n",
jobRuntime.GetUpdateID().GetValue())
tabWriter.Flush()
} else {
return fmt.Errorf(
"cannot create a new update as another update is already running")
}
}
}
// set the configuration version
jobConfig.ChangeLog = &peloton.ChangeLog{
Version: jobRuntime.GetConfigurationVersion(),
}
var opaque *peloton.OpaqueData
if len(opaqueData) > 0 {
opaque = &peloton.OpaqueData{Data: opaqueData}
}
var request = &updatesvc.CreateUpdateRequest{
JobId: &peloton.JobID{
Value: jobID,
},
JobConfig: &jobConfig,
UpdateConfig: &update.UpdateConfig{
BatchSize: batchSize,
MaxInstanceAttempts: maxInstanceAttempts,
MaxFailureInstances: maxFailureInstances,
RollbackOnFailure: updateRollbackOnFailure,
StartPaused: updateStartInPausedState,
InPlace: inPlace,
},
OpaqueData: opaque,
}
response, err = c.updateClient.CreateUpdate(c.ctx, request)
if err != nil {
if yarpcerrors.IsInvalidArgument(err) &&
yarpcerrors.FromError(err).Message() == invalidVersionError {
continue
}
return err
}
break
}
printUpdateCreateResponse(response, c.Debug)
return nil
}