func()

in controllers/processingjob/processingjob_controller.go [125:208]


func (r *Reconciler) reconcileProcessingJob(ctx reconcileRequestContext) error {
	var err error

	// Set first-touch status
	if ctx.ProcessingJob.Status.ProcessingJobStatus == "" {
		if err = r.updateStatus(ctx, controllers.InitializingJobStatus); err != nil {
			return err
		}
	}

	if err = r.initializeContext(&ctx); err != nil {
		return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to initialize operator"))
	}

	// Add finalizer if it's not marked for deletion.
	if !controllers.HasDeletionTimestamp(ctx.ProcessingJob.ObjectMeta) {
		if !controllers.ContainsString(ctx.ProcessingJob.ObjectMeta.GetFinalizers(), controllers.SageMakerResourceFinalizerName) {
			ctx.ProcessingJob.ObjectMeta.Finalizers = append(ctx.ProcessingJob.ObjectMeta.Finalizers, controllers.SageMakerResourceFinalizerName)
			if err := r.Update(ctx, ctx.ProcessingJob); err != nil {
				return errors.Wrap(err, "Failed to add finalizer")
			}
			ctx.Log.Info("Finalizer added")
		}
	}

	// Get the ProcessingJob from SageMaker
	if ctx.ProcessingJobDescription, err = ctx.SageMakerClient.DescribeProcessingJob(ctx, ctx.ProcessingJobName); err != nil {
		return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to describe SageMaker processing job"))
	}

	// The resource does not exist within SageMaker yet.
	if ctx.ProcessingJobDescription == nil {
		if controllers.HasDeletionTimestamp(ctx.ProcessingJob.ObjectMeta) {
			return r.removeFinalizer(ctx)
		}

		if err = r.createProcessingJob(ctx); err != nil {
			return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to create processing job"))
		}

		if ctx.ProcessingJobDescription, err = ctx.SageMakerClient.DescribeProcessingJob(ctx, ctx.ProcessingJobName); err != nil {
			return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to describe SageMaker processing job"))
		}
	}

	switch *ctx.ProcessingJobDescription.ProcessingJobStatus {
	case sagemaker.ProcessingJobStatusInProgress:
		if controllers.HasDeletionTimestamp(ctx.ProcessingJob.ObjectMeta) {
			// Request to stop the job
			if _, err := ctx.SageMakerClient.StopProcessingJob(ctx, ctx.ProcessingJobName); err != nil && !clientwrapper.IsStopTrainingJob404Error(err) {
				return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to delete processing job"))
			}
			// Describe the new state of the job
			if ctx.ProcessingJobDescription, err = ctx.SageMakerClient.DescribeProcessingJob(ctx, ctx.ProcessingJobName); err != nil {
				return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to describe SageMaker processing job"))
			}
		}
		break

	case sagemaker.ProcessingJobStatusCompleted:
		fallthrough

	case sagemaker.ProcessingJobStatusStopped, sagemaker.ProcessingJobStatusFailed:
		if controllers.HasDeletionTimestamp(ctx.ProcessingJob.ObjectMeta) {
			return r.removeFinalizer(ctx)
		}
		break

	case sagemaker.ProcessingJobStatusStopping:
		break

	default:
		unknownStateError := errors.New(fmt.Sprintf("Unknown Processing Job Status: %s", *ctx.ProcessingJobDescription.ProcessingJobStatus))
		return r.updateStatusAndReturnError(ctx, unknownStateError)
	}

	status := *ctx.ProcessingJobDescription.ProcessingJobStatus
	additional := controllers.GetOrDefault(ctx.ProcessingJobDescription.FailureReason, "")

	if err = r.updateStatusWithAdditional(ctx, status, additional); err != nil {
		return err
	}
	return nil
}