func()

in pkg/cli/update_actions.go [57:179]


func (c *Client) UpdateCreateAction(
	jobID string,
	cfg string,
	batchSize uint32,
	respoolPath string,
	configVersion uint64,
	override bool,
	maxInstanceAttempts uint32,
	maxFailureInstances uint32,
	updateRollbackOnFailure bool,
	updateStartInPausedState bool,
	opaqueData string,
	inPlace bool) error {
	var jobConfig job.JobConfig
	var response *updatesvc.CreateUpdateResponse

	// read the job configuration
	buffer, err := ioutil.ReadFile(cfg)
	if err != nil {
		return fmt.Errorf("unable to open file %s: %v", cfg, err)
	}
	if err := yaml.Unmarshal(buffer, &jobConfig); err != nil {
		return fmt.Errorf("unable to parse file %s: %v", cfg, err)
	}

	// fetch the resource pool id
	respoolID, err := c.LookupResourcePoolID(respoolPath)
	if err != nil {
		return err
	}
	if respoolID == nil {
		return fmt.Errorf("unable to find resource pool ID for "+
			":%s", respoolPath)
	}

	// set the resource pool id
	jobConfig.RespoolID = respoolID

	for {
		// first fetch the job runtime
		var jobGetRequest = &job.GetRequest{
			Id: &peloton.JobID{
				Value: jobID,
			},
		}
		jobGetResponse, err := c.jobClient.Get(c.ctx, jobGetRequest)
		if err != nil {
			return err
		}

		// check if there is another update going on
		jobRuntime := jobGetResponse.GetJobInfo().GetRuntime()
		if jobRuntime == nil {
			return fmt.Errorf("unable to find the job to update")
		}

		if configVersion > 0 {
			if jobRuntime.GetConfigurationVersion() != configVersion {
				return fmt.Errorf(
					"invalid input configuration version current %v provided %v",
					jobRuntime.GetConfigurationVersion(), configVersion)
			}
		}

		if jobRuntime.GetUpdateID() != nil &&
			len(jobRuntime.GetUpdateID().GetValue()) > 0 {
			terminal, err := c.isUpdateTerminated(jobRuntime.GetUpdateID())
			if err != nil {
				return err
			}

			if !terminal {
				if override {
					fmt.Fprintf(tabWriter, "going to override existing update: %v\n",
						jobRuntime.GetUpdateID().GetValue())
					tabWriter.Flush()
				} else {
					return fmt.Errorf(
						"cannot create a new update as another update is already running")
				}
			}
		}

		// set the configuration version
		jobConfig.ChangeLog = &peloton.ChangeLog{
			Version: jobRuntime.GetConfigurationVersion(),
		}

		var opaque *peloton.OpaqueData
		if len(opaqueData) > 0 {
			opaque = &peloton.OpaqueData{Data: opaqueData}
		}

		var request = &updatesvc.CreateUpdateRequest{
			JobId: &peloton.JobID{
				Value: jobID,
			},
			JobConfig: &jobConfig,
			UpdateConfig: &update.UpdateConfig{
				BatchSize:           batchSize,
				MaxInstanceAttempts: maxInstanceAttempts,
				MaxFailureInstances: maxFailureInstances,
				RollbackOnFailure:   updateRollbackOnFailure,
				StartPaused:         updateStartInPausedState,
				InPlace:             inPlace,
			},
			OpaqueData: opaque,
		}

		response, err = c.updateClient.CreateUpdate(c.ctx, request)
		if err != nil {
			if yarpcerrors.IsInvalidArgument(err) &&
				yarpcerrors.FromError(err).Message() == invalidVersionError {
				continue
			}
			return err
		}
		break
	}

	printUpdateCreateResponse(response, c.Debug)
	return nil
}