in controllers/trainingjob/trainingjob_controller.go [135:247]
func (r *Reconciler) reconcileTrainingJob(ctx reconcileRequestContext) error {
var err error
// Set first-touch status
if ctx.TrainingJob.Status.TrainingJobStatus == "" {
if err = r.updateStatus(ctx, controllers.InitializingJobStatus, ""); err != nil {
return err
}
}
if err = r.initializeContext(&ctx); err != nil {
return r.updateStatusAndReturnError(ctx, string(sagemaker.TrainingJobStatusFailed), "", errors.Wrap(err, "Unable to initialize operator"))
}
// Add finalizer if it's not marked for deletion.
if !controllers.HasDeletionTimestamp(ctx.TrainingJob.ObjectMeta) {
if !controllers.ContainsString(ctx.TrainingJob.ObjectMeta.GetFinalizers(), controllers.SageMakerResourceFinalizerName) {
ctx.TrainingJob.ObjectMeta.Finalizers = append(ctx.TrainingJob.ObjectMeta.Finalizers, controllers.SageMakerResourceFinalizerName)
if err := r.Update(ctx, ctx.TrainingJob); err != nil {
return errors.Wrap(err, "Failed to add finalizer")
}
ctx.Log.Info("Finalizer added")
}
}
// Get the TrainingJob from SageMaker
if ctx.TrainingJobDescription, err = ctx.SageMakerClient.DescribeTrainingJob(ctx, ctx.TrainingJobName); err != nil {
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to describe SageMaker training job"))
}
// The resource does not exist within SageMaker yet.
if ctx.TrainingJobDescription == nil {
if controllers.HasDeletionTimestamp(ctx.TrainingJob.ObjectMeta) {
return r.removeFinalizer(ctx)
}
if err = r.createTrainingJob(ctx); err != nil {
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to create training job"))
}
if ctx.TrainingJobDescription, err = ctx.SageMakerClient.DescribeTrainingJob(ctx, ctx.TrainingJobName); err != nil {
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to describe SageMaker training job"))
}
}
// Check sagemaker describe output for each debug jobs and
// generates the update if k8s status for corresponding debug jobs differ.
// Initially status will be empty so we will generate update for all debug jobs
if len(ctx.TrainingJobDescription.DebugRuleEvaluationStatuses) != len(ctx.TrainingJob.Status.DebugRuleEvaluationStatuses) {
if err = r.addDebugRuleEvaluationStatusesToStatus(ctx); err != nil {
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to add debug job statuses to status"))
}
} else {
for i, debugJob := range ctx.TrainingJobDescription.DebugRuleEvaluationStatuses {
if *debugJob.RuleEvaluationStatus != controllers.GetOrDefault(ctx.TrainingJob.Status.DebugRuleEvaluationStatuses[i].RuleEvaluationStatus, "") {
if err = r.addDebugRuleEvaluationStatusesToStatus(ctx); err != nil {
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to add debug job statuses to status"))
}
break
}
}
}
switch *ctx.TrainingJobDescription.TrainingJobStatus {
case sagemaker.TrainingJobStatusInProgress:
if controllers.HasDeletionTimestamp(ctx.TrainingJob.ObjectMeta) {
// Request to stop the job
if _, err := ctx.SageMakerClient.StopTrainingJob(ctx, ctx.TrainingJobName); err != nil && !clientwrapper.IsStopTrainingJob404Error(err) {
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to delete training job"))
}
// Describe the new state of the job
if ctx.TrainingJobDescription, err = ctx.SageMakerClient.DescribeTrainingJob(ctx, ctx.TrainingJobName); err != nil {
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to describe SageMaker training job"))
}
}
break
case sagemaker.TrainingJobStatusCompleted:
if err = r.addModelPathToStatus(ctx); err != nil {
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", errors.Wrap(err, "Unable to add model path to status"))
}
fallthrough
case sagemaker.TrainingJobStatusStopped, sagemaker.TrainingJobStatusFailed:
if controllers.HasDeletionTimestamp(ctx.TrainingJob.ObjectMeta) {
return r.removeFinalizer(ctx)
}
break
case sagemaker.TrainingJobStatusStopping:
break
default:
unknownStateError := errors.New(fmt.Sprintf("Unknown Training Job Status: %s", *ctx.TrainingJobDescription.TrainingJobStatus))
return r.updateStatusAndReturnError(ctx, ReconcilingTrainingJobStatus, "", unknownStateError)
}
primaryStatus := *ctx.TrainingJobDescription.TrainingJobStatus
secondaryStatus := *ctx.TrainingJobDescription.SecondaryStatus
additional := controllers.GetOrDefault(ctx.TrainingJobDescription.FailureReason, "")
if *ctx.TrainingJobDescription.TrainingJobStatus == string(sagemaker.TrainingJobStatusStopping) {
// Clear the secondary status if we detected stopping, since SageMaker has unclear secondary statuses during this phase
// Open ticket with the SageMaker team: https://t.corp.amazon.com/0411302791
secondaryStatus = ""
}
if err = r.updateStatusWithAdditional(ctx, primaryStatus, secondaryStatus, additional); err != nil {
return err
}
return nil
}