in prow/plank/reconciler.go [332:558]
func (r *reconciler) syncPendingJob(ctx context.Context, pj *prowv1.ProwJob) (*reconcile.Result, error) {
prevPJ := pj.DeepCopy()
pod, podExists, err := r.pod(ctx, pj)
if err != nil {
return nil, err
}
if !podExists {
// Pod is missing. This can happen in case the previous pod was deleted manually or by
// a rescheduler. Start a new pod.
id, pn, err := r.startPod(ctx, pj)
if err != nil {
if !isRequestError(err) {
return nil, fmt.Errorf("error starting pod %s: %w", pod.Name, err)
}
pj.Status.State = prowv1.ErrorState
pj.SetComplete()
pj.Status.Description = fmt.Sprintf("Pod can not be created: %v", err)
r.log.WithFields(pjutil.ProwJobFields(pj)).WithError(err).Warning("Unprocessable pod.")
} else {
pj.Status.BuildID = id
pj.Status.PodName = pn
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pod is missing, starting a new pod")
}
} else {
switch pod.Status.Phase {
case corev1.PodUnknown:
// Pod is in Unknown state. This can happen if there is a problem with
// the node. Delete the old pod, this will fire an event that triggers
// a new reconciliation in which we will re-create the pod.
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pod is in unknown state, deleting & restarting pod")
client, ok := r.buildClients[pj.ClusterAlias()]
if !ok {
return nil, fmt.Errorf("unknown pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias())
}
if finalizers := sets.NewString(pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
oldPod := pod.DeepCopy()
pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
}
}
r.log.WithField("name", pj.ObjectMeta.Name).Debug("Delete Pod.")
return nil, ctrlruntimeclient.IgnoreNotFound(client.Delete(ctx, pod))
case corev1.PodSucceeded:
pj.SetComplete()
// There were bugs around this in the past so be paranoid and verify each container
// https://github.com/kubernetes/kubernetes/issues/58711 is only fixed in 1.18+
if didPodSucceed(pod) {
// Pod succeeded. Update ProwJob and talk to GitHub.
pj.Status.State = prowv1.SuccessState
pj.Status.Description = "Job succeeded."
} else {
pj.Status.State = prowv1.ErrorState
pj.Status.Description = "Pod was in succeeded phase but some containers didn't finish"
}
case corev1.PodFailed:
if pod.Status.Reason == Evicted {
// Pod was evicted.
if pj.Spec.ErrorOnEviction {
// ErrorOnEviction is enabled, complete the PJ and mark it as errored.
pj.SetComplete()
pj.Status.State = prowv1.ErrorState
pj.Status.Description = "Job pod was evicted by the cluster."
break
}
// ErrorOnEviction is disabled. Delete the pod now and recreate it in
// the next resync.
client, ok := r.buildClients[pj.ClusterAlias()]
if !ok {
return nil, fmt.Errorf("evicted pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias())
}
if finalizers := sets.NewString(pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
oldPod := pod.DeepCopy()
pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
}
}
r.log.WithField("name", pj.ObjectMeta.Name).Debug("Delete Pod.")
return nil, ctrlruntimeclient.IgnoreNotFound(client.Delete(ctx, pod))
}
// Pod failed. Update ProwJob, talk to GitHub.
pj.SetComplete()
pj.Status.State = prowv1.FailureState
pj.Status.Description = "Job failed."
case corev1.PodPending:
var requeueAfter time.Duration
maxPodPending := r.config().Plank.PodPendingTimeout.Duration
maxPodUnscheduled := r.config().Plank.PodUnscheduledTimeout.Duration
if pod.Status.StartTime.IsZero() {
if time.Since(pod.CreationTimestamp.Time) >= maxPodUnscheduled {
// Pod is stuck in unscheduled state longer than maxPodUncheduled
// abort the job, and talk to GitHub
pj.SetComplete()
pj.Status.State = prowv1.ErrorState
pj.Status.Description = "Pod scheduling timeout."
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Marked job for stale unscheduled pod as errored.")
if err := r.deletePod(ctx, pj); err != nil {
return nil, fmt.Errorf("failed to delete pod %s/%s in cluster %s: %w", pod.Namespace, pod.Name, pj.ClusterAlias(), err)
}
break
} else {
// We have to re-check on the pod once we reached maxPodUnscheduled to
// be able to fail the job if it didn't get scheduled by then.
requeueAfter = maxPodUnscheduled - time.Since(pod.CreationTimestamp.Time)
}
} else {
if time.Since(pod.Status.StartTime.Time) >= maxPodPending {
// Pod is stuck in pending state longer than maxPodPending
// abort the job, and talk to GitHub
pj.SetComplete()
pj.Status.State = prowv1.ErrorState
pj.Status.Description = "Pod pending timeout."
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Marked job for stale pending pod as errored.")
if err := r.deletePod(ctx, pj); err != nil {
return nil, fmt.Errorf("failed to delete pod %s/%s in cluster %s: %w", pod.Namespace, pod.Name, pj.ClusterAlias(), err)
}
break
} else {
// We have to re-check on the pod once we reached maxPodPending to
// be able to fail the job if it didn't start running by then.
requeueAfter = maxPodPending - time.Since(pod.Status.StartTime.Time)
}
}
// Pod didn't start but didn't reach the scheduling or pending timeout yet,
// do nothing but check on it again once the timeout is reached.
if pod.DeletionTimestamp == nil {
return &reconcile.Result{RequeueAfter: requeueAfter}, nil
}
case corev1.PodRunning:
if pod.DeletionTimestamp != nil {
break
}
maxPodRunning := r.config().Plank.PodRunningTimeout.Duration
if pod.Status.StartTime.IsZero() || time.Since(pod.Status.StartTime.Time) < maxPodRunning {
// Pod is still running. Do nothing.
return nil, nil
}
// Pod is stuck in running state longer than maxPodRunning
// abort the job, and talk to GitHub
pj.SetComplete()
pj.Status.State = prowv1.AbortedState
pj.Status.Description = "Pod running timeout."
if err := r.deletePod(ctx, pj); err != nil {
return nil, fmt.Errorf("failed to delete pod %s/%s in cluster %s: %w", pod.Namespace, pod.Name, pj.ClusterAlias(), err)
}
default:
if pod.DeletionTimestamp == nil {
// other states, ignore
return nil, nil
}
}
}
// This can happen in any phase and means the node got evicted after it became unresponsive. Delete the finalizer so the pod
// vanishes and we will silently re-create it in the next iteration.
if pod != nil && pod.DeletionTimestamp != nil && pod.Status.Reason == "NodeLost" {
r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pods Node got lost, deleting & restarting pod")
client, ok := r.buildClients[pj.ClusterAlias()]
if !ok {
return nil, fmt.Errorf("unknown pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias())
}
if finalizers := sets.NewString(pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
oldPod := pod.DeepCopy()
pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
}
}
return nil, nil
}
// If a pod gets deleted unexpectedly, it might be in any phase and will stick around until
// we complete the job if the kubernetes reporter is used, because it sets a finalizer.
if !pj.Complete() && pod != nil && pod.DeletionTimestamp != nil {
pj.SetComplete()
pj.Status.State = prowv1.ErrorState
pj.Status.Description = "Pod got deleted unexpectedly"
}
pj.Status.URL, err = pjutil.JobURL(r.config().Plank, *pj, r.log)
if err != nil {
r.log.WithFields(pjutil.ProwJobFields(pj)).WithError(err).Warn("failed to get jobURL")
}
if prevPJ.Status.State != pj.Status.State {
r.log.WithFields(pjutil.ProwJobFields(pj)).
WithField("from", prevPJ.Status.State).
WithField("to", pj.Status.State).Info("Transitioning states.")
}
if err := r.pjClient.Patch(ctx, pj.DeepCopy(), ctrlruntimeclient.MergeFrom(prevPJ)); err != nil {
return nil, fmt.Errorf("patching prowjob: %w", err)
}
// If the ProwJob state has changed, we must ensure that the update reaches the cache before
// processing the key again. Without this we might accidentally replace intentionally deleted pods
// or otherwise incorrectly react to stale ProwJob state.
state := pj.Status.State
if prevPJ.Status.State == state {
return nil, nil
}
nn := types.NamespacedName{Namespace: pj.Namespace, Name: pj.Name}
if err := wait.Poll(100*time.Millisecond, 2*time.Second, func() (bool, error) {
if err := r.pjClient.Get(ctx, nn, pj); err != nil {
return false, fmt.Errorf("failed to get prowjob: %w", err)
}
return pj.Status.State == state, nil
}); err != nil {
return nil, fmt.Errorf("failed to wait for cached prowjob %s to get into state %s: %w", nn.String(), state, err)
}
return nil, nil
}