func()

in controllers/trainingjob/trainingjob_controller.go [135:247]


func (r *Reconciler) reconcileTrainingJob(ctx reconcileRequestContext) error {
	var err error

	// Set first-touch status
	if ctx.TrainingJob.Status.TrainingJobStatus == "" {
		if err = r.updateStatus(ctx, controllers.InitializingJobStatus, ""); err != nil {
			return err
		}
	}

	if err = r.initializeContext(&ctx); err != nil {
		return r.updateStatusAndReturnError(ctx, string(sagemaker.TrainingJobStatusFailed), "", errors.Wrap(err, "Unable to initialize operator"))
	}

	// Add finalizer if it's not marked for deletion.
	if !controllers.HasDeletionTimestamp(ctx.TrainingJob.ObjectMeta) {
		if !controllers.ContainsString(ctx.TrainingJob.ObjectMeta.GetFinalizers(), controllers.SageMakerResourceFinalizerName) {
			ctx.TrainingJob.ObjectMeta.Finalizers = append(ctx.TrainingJob.ObjectMeta.Finalizers, controllers.SageMakerResourceFinalizerName)
			if err := r.Update(ctx, ctx.TrainingJob); err != nil {
				return errors.Wrap(err, "Failed to add finalizer")
			}
			ctx.Log.Info("Finalizer added")
		}
	}

	// Get the TrainingJob from SageMaker
	if ctx.TrainingJobDescription, err = ctx.SageMakerClient.DescribeTrainingJob(ctx, ctx.TrainingJobName); err != nil {
		return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to describe SageMaker training job"))
	}

	// The resource does not exist within SageMaker yet.
	if ctx.TrainingJobDescription == nil {
		if controllers.HasDeletionTimestamp(ctx.TrainingJob.ObjectMeta) {
			return r.removeFinalizer(ctx)
		}

		if err = r.createTrainingJob(ctx); err != nil {
			return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to create training job"))
		}

		if ctx.TrainingJobDescription, err = ctx.SageMakerClient.DescribeTrainingJob(ctx, ctx.TrainingJobName); err != nil {
			return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to describe SageMaker training job"))
		}
	}

	// Check sagemaker describe output for each debug jobs and
	// generates the update if k8s status for corresponding debug jobs differ.
	// Initially status will be empty so we will generate update for all debug jobs
	if len(ctx.TrainingJobDescription.DebugRuleEvaluationStatuses) != len(ctx.TrainingJob.Status.DebugRuleEvaluationStatuses) {
		if err = r.addDebugRuleEvaluationStatusesToStatus(ctx); err != nil {
			return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to add debug job statuses to status"))
		}
	} else {
		for i, debugJob := range ctx.TrainingJobDescription.DebugRuleEvaluationStatuses {
			if *debugJob.RuleEvaluationStatus != controllers.GetOrDefault(ctx.TrainingJob.Status.DebugRuleEvaluationStatuses[i].RuleEvaluationStatus, "") {
				if err = r.addDebugRuleEvaluationStatusesToStatus(ctx); err != nil {
					return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to add debug job statuses to status"))
				}
				break
			}
		}
	}

	switch *ctx.TrainingJobDescription.TrainingJobStatus {
	case sagemaker.TrainingJobStatusInProgress:
		if controllers.HasDeletionTimestamp(ctx.TrainingJob.ObjectMeta) {
			// Request to stop the job
			if _, err := ctx.SageMakerClient.StopTrainingJob(ctx, ctx.TrainingJobName); err != nil && !clientwrapper.IsStopTrainingJob404Error(err) {
				return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to delete training job"))
			}
			// Describe the new state of the job
			if ctx.TrainingJobDescription, err = ctx.SageMakerClient.DescribeTrainingJob(ctx, ctx.TrainingJobName); err != nil {
				return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to describe SageMaker training job"))
			}
		}
		break

	case sagemaker.TrainingJobStatusCompleted:
		if err = r.addModelPathToStatus(ctx); err != nil {
			return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to add model path to status"))
		}
		fallthrough

	case sagemaker.TrainingJobStatusStopped, sagemaker.TrainingJobStatusFailed:
		if controllers.HasDeletionTimestamp(ctx.TrainingJob.ObjectMeta) {
			return r.removeFinalizer(ctx)
		}
		break

	case sagemaker.TrainingJobStatusStopping:
		break

	default:
		unknownStateError := errors.New(fmt.Sprintf("Unknown Training Job Status: %s", *ctx.TrainingJobDescription.TrainingJobStatus))
		return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", unknownStateError)
	}

	primaryStatus := *ctx.TrainingJobDescription.TrainingJobStatus
	secondaryStatus := *ctx.TrainingJobDescription.SecondaryStatus
	additional := controllers.GetOrDefault(ctx.TrainingJobDescription.FailureReason, "")

	if *ctx.TrainingJobDescription.TrainingJobStatus == string(sagemaker.TrainingJobStatusStopping) {
		// Clear the secondary status if we detected stopping, since SageMaker has unclear secondary statuses during this phase
		// Open ticket with the SageMaker team: https://t.corp.amazon.com/0411302791
		secondaryStatus = ""
	}

	if err = r.updateStatusWithAdditional(ctx, primaryStatus, secondaryStatus, additional); err != nil {
		return err
	}

	return nil
}