in controllers/processingjob/processingjob_controller.go [125:208]
func (r *Reconciler) reconcileProcessingJob(ctx reconcileRequestContext) error {
var err error
// Set first-touch status
if ctx.ProcessingJob.Status.ProcessingJobStatus == "" {
if err = r.updateStatus(ctx, controllers.InitializingJobStatus); err != nil {
return err
}
}
if err = r.initializeContext(&ctx); err != nil {
return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to initialize operator"))
}
// Add finalizer if it's not marked for deletion.
if !controllers.HasDeletionTimestamp(ctx.ProcessingJob.ObjectMeta) {
if !controllers.ContainsString(ctx.ProcessingJob.ObjectMeta.GetFinalizers(), controllers.SageMakerResourceFinalizerName) {
ctx.ProcessingJob.ObjectMeta.Finalizers = append(ctx.ProcessingJob.ObjectMeta.Finalizers, controllers.SageMakerResourceFinalizerName)
if err := r.Update(ctx, ctx.ProcessingJob); err != nil {
return errors.Wrap(err, "Failed to add finalizer")
}
ctx.Log.Info("Finalizer added")
}
}
// Get the ProcessingJob from SageMaker
if ctx.ProcessingJobDescription, err = ctx.SageMakerClient.DescribeProcessingJob(ctx, ctx.ProcessingJobName); err != nil {
return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to describe SageMaker processing job"))
}
// The resource does not exist within SageMaker yet.
if ctx.ProcessingJobDescription == nil {
if controllers.HasDeletionTimestamp(ctx.ProcessingJob.ObjectMeta) {
return r.removeFinalizer(ctx)
}
if err = r.createProcessingJob(ctx); err != nil {
return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to create processing job"))
}
if ctx.ProcessingJobDescription, err = ctx.SageMakerClient.DescribeProcessingJob(ctx, ctx.ProcessingJobName); err != nil {
return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to describe SageMaker processing job"))
}
}
switch *ctx.ProcessingJobDescription.ProcessingJobStatus {
case sagemaker.ProcessingJobStatusInProgress:
if controllers.HasDeletionTimestamp(ctx.ProcessingJob.ObjectMeta) {
// Request to stop the job
if _, err := ctx.SageMakerClient.StopProcessingJob(ctx, ctx.ProcessingJobName); err != nil && !clientwrapper.IsStopTrainingJob404Error(err) {
return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to delete processing job"))
}
// Describe the new state of the job
if ctx.ProcessingJobDescription, err = ctx.SageMakerClient.DescribeProcessingJob(ctx, ctx.ProcessingJobName); err != nil {
return r.updateStatusAndReturnError(ctx, errors.Wrap(err, "Unable to describe SageMaker processing job"))
}
}
break
case sagemaker.ProcessingJobStatusCompleted:
fallthrough
case sagemaker.ProcessingJobStatusStopped, sagemaker.ProcessingJobStatusFailed:
if controllers.HasDeletionTimestamp(ctx.ProcessingJob.ObjectMeta) {
return r.removeFinalizer(ctx)
}
break
case sagemaker.ProcessingJobStatusStopping:
break
default:
unknownStateError := errors.New(fmt.Sprintf("Unknown Processing Job Status: %s", *ctx.ProcessingJobDescription.ProcessingJobStatus))
return r.updateStatusAndReturnError(ctx, unknownStateError)
}
status := *ctx.ProcessingJobDescription.ProcessingJobStatus
additional := controllers.GetOrDefault(ctx.ProcessingJobDescription.FailureReason, "")
if err = r.updateStatusWithAdditional(ctx, status, additional); err != nil {
return err
}
return nil
}