func()

in kubernetes/controllers/job.go [96:176]


func (r *ElasticJobReconciler) UpdateJobStatus(job interface{}, replicas map[v1.ReplicaType]*v1.ReplicaSpec, jobStatus *v1.JobStatus) error {
	elasticJob, ok := job.(*v1alpha1.ElasticJob)
	if !ok {
		return fmt.Errorf("%+v is not a type of ElasticJob", elasticJob)
	}

	log := logger.LoggerForJob(elasticJob)

	for rtype, spec := range replicas {
		status := jobStatus.ReplicaStatuses[rtype]

		succeeded := status.Succeeded
		expected := *(spec.Replicas) - succeeded
		running := status.Active
		failed := status.Failed

		log.Infof("ElasticJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d",
			elasticJob.Name, rtype, expected, running, succeeded, failed)

		if rtype == v1.ReplicaType(v1alpha1.ElasticReplicaTypeWorker) {
			if running > 0 {
				msg := fmt.Sprintf("ElasticJob %s is running.", elasticJob.Name)
				err := commonutil.UpdateJobConditions(jobStatus, v1.JobRunning, ElasticJobRunningReason, msg)
				if err != nil {
					log.Errorf("Append job condition error: %v", err)
					return err
				}
			}
			// when all workers succeed, the job is finished.
			if expected == 0 {
				msg := fmt.Sprintf("ElasticJob %s is successfully completed.", elasticJob.Name)
				log.Info(msg)
				r.jobController.Recorder.Event(elasticJob, corev1.EventTypeNormal, ElasticJobSucceededReason, msg)
				if jobStatus.CompletionTime == nil {
					now := metav1.Now()
					jobStatus.CompletionTime = &now
				}
				err := commonutil.UpdateJobConditions(jobStatus, v1.JobSucceeded, ElasticJobSucceededReason, msg)
				if err != nil {
					log.Errorf("Append job condition error: %v", err)
					return err
				}
				return nil
			}
		}
		if failed > 0 {
			if spec.RestartPolicy == v1.RestartPolicyExitCode {
				msg := fmt.Sprintf("ElasticJob %s is restarting because %d %s replica(s) failed.", elasticJob.Name, failed, rtype)
				r.jobController.Recorder.Event(elasticJob, corev1.EventTypeWarning, ElasticJobRestartingReason, msg)
				err := commonutil.UpdateJobConditions(jobStatus, v1.JobRestarting, ElasticJobRestartingReason, msg)
				if err != nil {
					log.Errorf("Append job condition error: %v", err)
					return err
				}
			} else {
				msg := fmt.Sprintf("ElasticJob %s is failed because %d %s replica(s) failed.", elasticJob.Name, failed, rtype)
				r.jobController.Recorder.Event(elasticJob, corev1.EventTypeNormal, ElasticJobFailedReason, msg)
				if elasticJob.Status.CompletionTime == nil {
					now := metav1.Now()
					elasticJob.Status.CompletionTime = &now
				}
				err := commonutil.UpdateJobConditions(jobStatus, v1.JobFailed, ElasticJobFailedReason, msg)
				if err != nil {
					log.Errorf("Append job condition error: %v", err)
					return err
				}
			}
		}
	}

	// Some workers are still running, leave a running condition.
	msg := fmt.Sprintf("ElasticJob %s is running.", elasticJob.Name)
	log.Infof(msg)

	if err := commonutil.UpdateJobConditions(jobStatus, v1.JobRunning, ElasticJobRunningReason, msg); err != nil {
		log.Errorf("failed to update ElasticJob conditions %v", err)
		return err
	}

	return nil
}