func()

in prow/plank/reconciler.go [332:558]


func (r *reconciler) syncPendingJob(ctx context.Context, pj *prowv1.ProwJob) (*reconcile.Result, error) {
	prevPJ := pj.DeepCopy()

	pod, podExists, err := r.pod(ctx, pj)
	if err != nil {
		return nil, err
	}

	if !podExists {
		// Pod is missing. This can happen in case the previous pod was deleted manually or by
		// a rescheduler. Start a new pod.
		id, pn, err := r.startPod(ctx, pj)
		if err != nil {
			if !isRequestError(err) {
				return nil, fmt.Errorf("error starting pod %s: %w", pod.Name, err)
			}
			pj.Status.State = prowv1.ErrorState
			pj.SetComplete()
			pj.Status.Description = fmt.Sprintf("Pod can not be created: %v", err)
			r.log.WithFields(pjutil.ProwJobFields(pj)).WithError(err).Warning("Unprocessable pod.")
		} else {
			pj.Status.BuildID = id
			pj.Status.PodName = pn
			r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pod is missing, starting a new pod")
		}
	} else {

		switch pod.Status.Phase {
		case corev1.PodUnknown:
			// Pod is in Unknown state. This can happen if there is a problem with
			// the node. Delete the old pod, this will fire an event that triggers
			// a new reconciliation in which we will re-create the pod.
			r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pod is in unknown state, deleting & restarting pod")
			client, ok := r.buildClients[pj.ClusterAlias()]
			if !ok {
				return nil, fmt.Errorf("unknown pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias())
			}

			if finalizers := sets.NewString(pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
				// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
				oldPod := pod.DeepCopy()
				pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
				if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
					return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
				}
			}
			r.log.WithField("name", pj.ObjectMeta.Name).Debug("Delete Pod.")
			return nil, ctrlruntimeclient.IgnoreNotFound(client.Delete(ctx, pod))

		case corev1.PodSucceeded:
			pj.SetComplete()
			// There were bugs around this in the past so be paranoid and verify each container
			// https://github.com/kubernetes/kubernetes/issues/58711 is only fixed in 1.18+
			if didPodSucceed(pod) {
				// Pod succeeded. Update ProwJob and talk to GitHub.
				pj.Status.State = prowv1.SuccessState
				pj.Status.Description = "Job succeeded."
			} else {
				pj.Status.State = prowv1.ErrorState
				pj.Status.Description = "Pod was in succeeded phase but some containers didn't finish"
			}

		case corev1.PodFailed:
			if pod.Status.Reason == Evicted {
				// Pod was evicted.
				if pj.Spec.ErrorOnEviction {
					// ErrorOnEviction is enabled, complete the PJ and mark it as errored.
					pj.SetComplete()
					pj.Status.State = prowv1.ErrorState
					pj.Status.Description = "Job pod was evicted by the cluster."
					break
				}
				// ErrorOnEviction is disabled. Delete the pod now and recreate it in
				// the next resync.
				client, ok := r.buildClients[pj.ClusterAlias()]
				if !ok {
					return nil, fmt.Errorf("evicted pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias())
				}
				if finalizers := sets.NewString(pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
					// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
					oldPod := pod.DeepCopy()
					pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
					if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
						return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
					}
				}
				r.log.WithField("name", pj.ObjectMeta.Name).Debug("Delete Pod.")
				return nil, ctrlruntimeclient.IgnoreNotFound(client.Delete(ctx, pod))
			}
			// Pod failed. Update ProwJob, talk to GitHub.
			pj.SetComplete()
			pj.Status.State = prowv1.FailureState
			pj.Status.Description = "Job failed."

		case corev1.PodPending:
			var requeueAfter time.Duration
			maxPodPending := r.config().Plank.PodPendingTimeout.Duration
			maxPodUnscheduled := r.config().Plank.PodUnscheduledTimeout.Duration
			if pod.Status.StartTime.IsZero() {
				if time.Since(pod.CreationTimestamp.Time) >= maxPodUnscheduled {
					// Pod is stuck in unscheduled state longer than maxPodUncheduled
					// abort the job, and talk to GitHub
					pj.SetComplete()
					pj.Status.State = prowv1.ErrorState
					pj.Status.Description = "Pod scheduling timeout."
					r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Marked job for stale unscheduled pod as errored.")
					if err := r.deletePod(ctx, pj); err != nil {
						return nil, fmt.Errorf("failed to delete pod %s/%s in cluster %s: %w", pod.Namespace, pod.Name, pj.ClusterAlias(), err)
					}
					break
				} else {
					// We have to re-check on the pod once we reached maxPodUnscheduled to
					// be able to fail the job if it didn't get scheduled by then.
					requeueAfter = maxPodUnscheduled - time.Since(pod.CreationTimestamp.Time)
				}
			} else {
				if time.Since(pod.Status.StartTime.Time) >= maxPodPending {
					// Pod is stuck in pending state longer than maxPodPending
					// abort the job, and talk to GitHub
					pj.SetComplete()
					pj.Status.State = prowv1.ErrorState
					pj.Status.Description = "Pod pending timeout."
					r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Marked job for stale pending pod as errored.")
					if err := r.deletePod(ctx, pj); err != nil {
						return nil, fmt.Errorf("failed to delete pod %s/%s in cluster %s: %w", pod.Namespace, pod.Name, pj.ClusterAlias(), err)
					}
					break
				} else {
					// We have to re-check on the pod once we reached maxPodPending to
					// be able to fail the job if it didn't start running by then.
					requeueAfter = maxPodPending - time.Since(pod.Status.StartTime.Time)
				}
			}
			// Pod didn't start but didn't reach the scheduling or pending timeout yet,
			// do nothing but check on it again once the timeout is reached.
			if pod.DeletionTimestamp == nil {
				return &reconcile.Result{RequeueAfter: requeueAfter}, nil
			}
		case corev1.PodRunning:
			if pod.DeletionTimestamp != nil {
				break
			}
			maxPodRunning := r.config().Plank.PodRunningTimeout.Duration
			if pod.Status.StartTime.IsZero() || time.Since(pod.Status.StartTime.Time) < maxPodRunning {
				// Pod is still running. Do nothing.
				return nil, nil
			}

			// Pod is stuck in running state longer than maxPodRunning
			// abort the job, and talk to GitHub
			pj.SetComplete()
			pj.Status.State = prowv1.AbortedState
			pj.Status.Description = "Pod running timeout."
			if err := r.deletePod(ctx, pj); err != nil {
				return nil, fmt.Errorf("failed to delete pod %s/%s in cluster %s: %w", pod.Namespace, pod.Name, pj.ClusterAlias(), err)
			}
		default:
			if pod.DeletionTimestamp == nil {
				// other states, ignore
				return nil, nil
			}
		}
	}

	// This can happen in any phase and means the node got evicted after it became unresponsive. Delete the finalizer so the pod
	// vanishes and we will silently re-create it in the next iteration.
	if pod != nil && pod.DeletionTimestamp != nil && pod.Status.Reason == "NodeLost" {
		r.log.WithFields(pjutil.ProwJobFields(pj)).Info("Pods Node got lost, deleting & restarting pod")
		client, ok := r.buildClients[pj.ClusterAlias()]
		if !ok {
			return nil, fmt.Errorf("unknown pod %s: unknown cluster alias %q", pod.Name, pj.ClusterAlias())
		}

		if finalizers := sets.NewString(pod.Finalizers...); finalizers.Has(kubernetesreporterapi.FinalizerName) {
			// We want the end user to not see this, so we have to remove the finalizer, otherwise the pod hangs
			oldPod := pod.DeepCopy()
			pod.Finalizers = finalizers.Delete(kubernetesreporterapi.FinalizerName).UnsortedList()
			if err := client.Patch(ctx, pod, ctrlruntimeclient.MergeFrom(oldPod)); err != nil {
				return nil, fmt.Errorf("failed to patch pod trying to remove %s finalizer: %w", kubernetesreporterapi.FinalizerName, err)
			}
		}

		return nil, nil
	}

	// If a pod gets deleted unexpectedly, it might be in any phase and will stick around until
	// we complete the job if the kubernetes reporter is used, because it sets a finalizer.
	if !pj.Complete() && pod != nil && pod.DeletionTimestamp != nil {
		pj.SetComplete()
		pj.Status.State = prowv1.ErrorState
		pj.Status.Description = "Pod got deleted unexpectedly"
	}

	pj.Status.URL, err = pjutil.JobURL(r.config().Plank, *pj, r.log)
	if err != nil {
		r.log.WithFields(pjutil.ProwJobFields(pj)).WithError(err).Warn("failed to get jobURL")
	}

	if prevPJ.Status.State != pj.Status.State {
		r.log.WithFields(pjutil.ProwJobFields(pj)).
			WithField("from", prevPJ.Status.State).
			WithField("to", pj.Status.State).Info("Transitioning states.")
	}

	if err := r.pjClient.Patch(ctx, pj.DeepCopy(), ctrlruntimeclient.MergeFrom(prevPJ)); err != nil {
		return nil, fmt.Errorf("patching prowjob: %w", err)
	}

	// If the ProwJob state has changed, we must ensure that the update reaches the cache before
	// processing the key again. Without this we might accidentally replace intentionally deleted pods
	// or otherwise incorrectly react to stale ProwJob state.
	state := pj.Status.State
	if prevPJ.Status.State == state {
		return nil, nil
	}
	nn := types.NamespacedName{Namespace: pj.Namespace, Name: pj.Name}
	if err := wait.Poll(100*time.Millisecond, 2*time.Second, func() (bool, error) {
		if err := r.pjClient.Get(ctx, nn, pj); err != nil {
			return false, fmt.Errorf("failed to get prowjob: %w", err)
		}
		return pj.Status.State == state, nil
	}); err != nil {
		return nil, fmt.Errorf("failed to wait for cached prowjob %s to get into state %s: %w", nn.String(), state, err)
	}

	return nil, nil
}