func()

in pkg/jobmgr/jobsvc/stateless/handler.go [405:519]


func (h *serviceHandler) RestartJob(
	ctx context.Context,
	req *svc.RestartJobRequest) (resp *svc.RestartJobResponse, err error) {
	defer func() {
		headers := yarpcutil.GetHeaders(ctx)
		if err != nil {
			log.WithField("request", req).
				WithField("headers", headers).
				WithError(err).
				Warn("JobSVC.RestartJob failed")
			err = yarpcutil.ConvertToYARPCError(err)
			return
		}

		log.WithField("request", req).
			WithField("response", resp).
			WithField("headers", headers).
			Info("JobSVC.RestartJob succeeded")
	}()

	if !h.candidate.IsLeader() {
		return nil, yarpcerrors.UnavailableErrorf("JobSVC.RestartJob is not supported on non-leader")
	}

	jobID := &peloton.JobID{Value: req.GetJobId().GetValue()}
	cachedJob := h.jobFactory.AddJob(jobID)
	runtime, err := cachedJob.GetRuntime(ctx)
	if err != nil {
		return nil, errors.Wrap(err, "fail to get job runtime")
	}

	obj, err := h.jobConfigOps.GetResult(
		ctx,
		jobID,
		runtime.GetConfigurationVersion(),
	)
	if err != nil {
		return nil, errors.Wrap(err, "fail to get job config")
	}
	jobConfig := obj.JobConfig
	configAddOn := obj.ConfigAddOn
	jobSpec := obj.JobSpec

	// copy the config with provided resource version number
	newConfig := *jobConfig
	now := time.Now()
	newConfig.ChangeLog = &peloton.ChangeLog{
		Version:   jobConfig.GetChangeLog().GetVersion(),
		CreatedAt: uint64(now.UnixNano()),
		UpdatedAt: uint64(now.UnixNano()),
	}

	var newSpec stateless.JobSpec
	if jobSpec != nil {
		newSpec = *jobSpec
		newSpec.Revision = &v1alphapeloton.Revision{
			Version:   newConfig.GetChangeLog().GetVersion(),
			CreatedAt: newConfig.GetChangeLog().GetCreatedAt(),
			UpdatedAt: newConfig.GetChangeLog().GetUpdatedAt(),
		}
	}

	opaque := cached.WithOpaqueData(nil)
	if req.GetOpaqueData() != nil {
		opaque = cached.WithOpaqueData(&peloton.OpaqueData{
			Data: req.GetOpaqueData().GetData(),
		})
	}

	instancesToUpdate := convertInstanceIDRangesToSlice(req.GetRestartSpec().GetRanges(), newConfig.GetInstanceCount())
	if len(instancesToUpdate) == 0 {
		for i := uint32(0); i < newConfig.GetInstanceCount(); i++ {
			instancesToUpdate = append(instancesToUpdate, i)
		}
	}

	updateID, newEntityVersion, err := cachedJob.CreateWorkflow(
		ctx,
		models.WorkflowType_RESTART,
		&pbupdate.UpdateConfig{
			BatchSize: req.GetRestartSpec().GetBatchSize(),
			InPlace:   req.GetRestartSpec().GetInPlace(),
		},
		req.GetVersion(),
		cached.WithInstanceToProcess(
			nil,
			instancesToUpdate,
			nil),
		cached.WithConfig(
			&newConfig,
			jobConfig,
			configAddOn,
			&newSpec,
		),
		opaque,
	)

	// In case of error, since it is not clear if job runtime was
	// persisted with the update ID or not, enqueue the update to
	// the goal state. If the update ID got persisted, update should
	// start running, else, it should be aborted. Enqueueing it into
	// the goal state will ensure both. In case the update was not
	// persisted, clear the cache as well so that it is reloaded
	// from DB and cleaned up.
	// Add update to goal state engine to start it
	if len(updateID.GetValue()) > 0 {
		h.goalStateDriver.EnqueueUpdate(jobID, updateID, time.Now())
	}

	if err != nil {
		return nil, err
	}

	return &svc.RestartJobResponse{Version: newEntityVersion}, nil
}