in kubernetes/controllers/job.go [96:176]
func (r *ElasticJobReconciler) UpdateJobStatus(job interface{}, replicas map[v1.ReplicaType]*v1.ReplicaSpec, jobStatus *v1.JobStatus) error {
elasticJob, ok := job.(*v1alpha1.ElasticJob)
if !ok {
return fmt.Errorf("%+v is not a type of ElasticJob", elasticJob)
}
log := logger.LoggerForJob(elasticJob)
for rtype, spec := range replicas {
status := jobStatus.ReplicaStatuses[rtype]
succeeded := status.Succeeded
expected := *(spec.Replicas) - succeeded
running := status.Active
failed := status.Failed
log.Infof("ElasticJob=%s, ReplicaType=%s expected=%d, running=%d, succeeded=%d , failed=%d",
elasticJob.Name, rtype, expected, running, succeeded, failed)
if rtype == v1.ReplicaType(v1alpha1.ElasticReplicaTypeWorker) {
if running > 0 {
msg := fmt.Sprintf("ElasticJob %s is running.", elasticJob.Name)
err := commonutil.UpdateJobConditions(jobStatus, v1.JobRunning, ElasticJobRunningReason, msg)
if err != nil {
log.Errorf("Append job condition error: %v", err)
return err
}
}
// when all workers succeed, the job is finished.
if expected == 0 {
msg := fmt.Sprintf("ElasticJob %s is successfully completed.", elasticJob.Name)
log.Info(msg)
r.jobController.Recorder.Event(elasticJob, corev1.EventTypeNormal, ElasticJobSucceededReason, msg)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, v1.JobSucceeded, ElasticJobSucceededReason, msg)
if err != nil {
log.Errorf("Append job condition error: %v", err)
return err
}
return nil
}
}
if failed > 0 {
if spec.RestartPolicy == v1.RestartPolicyExitCode {
msg := fmt.Sprintf("ElasticJob %s is restarting because %d %s replica(s) failed.", elasticJob.Name, failed, rtype)
r.jobController.Recorder.Event(elasticJob, corev1.EventTypeWarning, ElasticJobRestartingReason, msg)
err := commonutil.UpdateJobConditions(jobStatus, v1.JobRestarting, ElasticJobRestartingReason, msg)
if err != nil {
log.Errorf("Append job condition error: %v", err)
return err
}
} else {
msg := fmt.Sprintf("ElasticJob %s is failed because %d %s replica(s) failed.", elasticJob.Name, failed, rtype)
r.jobController.Recorder.Event(elasticJob, corev1.EventTypeNormal, ElasticJobFailedReason, msg)
if elasticJob.Status.CompletionTime == nil {
now := metav1.Now()
elasticJob.Status.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(jobStatus, v1.JobFailed, ElasticJobFailedReason, msg)
if err != nil {
log.Errorf("Append job condition error: %v", err)
return err
}
}
}
}
// Some workers are still running, leave a running condition.
msg := fmt.Sprintf("ElasticJob %s is running.", elasticJob.Name)
log.Infof(msg)
if err := commonutil.UpdateJobConditions(jobStatus, v1.JobRunning, ElasticJobRunningReason, msg); err != nil {
log.Errorf("failed to update ElasticJob conditions %v", err)
return err
}
return nil
}