func()

in prow/cmd/sinker/main.go [289:462]


func (c *controller) clean() {

	metrics := sinkerReconciliationMetrics{
		startAt:                time.Now(),
		podsRemoved:            map[string]int{},
		podRemovalErrors:       map[string]int{},
		prowJobsCleaned:        map[string]int{},
		prowJobsCleaningErrors: map[string]int{}}

	// Clean up old prow jobs first.
	prowJobs := &prowapi.ProwJobList{}
	if err := c.prowJobClient.List(c.ctx, prowJobs, ctrlruntimeclient.InNamespace(c.config().ProwJobNamespace)); err != nil {
		c.logger.WithError(err).Error("Error listing prow jobs.")
		return
	}
	metrics.prowJobsCreated = len(prowJobs.Items)

	// Only delete pod if its prowjob is marked as finished
	pjMap := map[string]*prowapi.ProwJob{}
	isFinished := sets.NewString()

	maxProwJobAge := c.config().Sinker.MaxProwJobAge.Duration
	for i, prowJob := range prowJobs.Items {
		pjMap[prowJob.ObjectMeta.Name] = &prowJobs.Items[i]
		// Handle periodics separately.
		if prowJob.Spec.Type == prowapi.PeriodicJob {
			continue
		}
		if !prowJob.Complete() {
			continue
		}
		isFinished.Insert(prowJob.ObjectMeta.Name)
		if time.Since(prowJob.Status.StartTime.Time) <= maxProwJobAge {
			continue
		}
		if err := c.prowJobClient.Delete(c.ctx, &prowJob); err == nil {
			c.logger.WithFields(pjutil.ProwJobFields(&prowJob)).Info("Deleted prowjob.")
			metrics.prowJobsCleaned[reasonProwJobAged]++
		} else {
			c.logger.WithFields(pjutil.ProwJobFields(&prowJob)).WithError(err).Error("Error deleting prowjob.")
			metrics.prowJobsCleaningErrors[string(k8serrors.ReasonForError(err))]++
		}
	}

	// Keep track of what periodic jobs are in the config so we will
	// not clean up their last prowjob.
	isActivePeriodic := make(map[string]bool)
	for _, p := range c.config().Periodics {
		isActivePeriodic[p.Name] = true
	}

	// Get the jobs that we need to retain so horologium can continue working
	// as intended.
	latestPeriodics := pjutil.GetLatestProwJobs(prowJobs.Items, prowapi.PeriodicJob)
	for _, prowJob := range prowJobs.Items {
		if prowJob.Spec.Type != prowapi.PeriodicJob {
			continue
		}

		latestPJ := latestPeriodics[prowJob.Spec.Job]
		if isActivePeriodic[prowJob.Spec.Job] && prowJob.ObjectMeta.Name == latestPJ.ObjectMeta.Name {
			// Ignore deleting this one.
			continue
		}
		if !prowJob.Complete() {
			continue
		}
		isFinished.Insert(prowJob.ObjectMeta.Name)
		if time.Since(prowJob.Status.StartTime.Time) <= maxProwJobAge {
			continue
		}
		if err := c.prowJobClient.Delete(c.ctx, &prowJob); err == nil {
			c.logger.WithFields(pjutil.ProwJobFields(&prowJob)).Info("Deleted prowjob.")
			metrics.prowJobsCleaned[reasonProwJobAgedPeriodic]++
		} else {
			c.logger.WithFields(pjutil.ProwJobFields(&prowJob)).WithError(err).Error("Error deleting prowjob.")
			metrics.prowJobsCleaningErrors[string(k8serrors.ReasonForError(err))]++
		}
	}

	// Now clean up old pods.
	for cluster, client := range c.podClients {
		log := c.logger.WithField("cluster", cluster)
		var isClusterExcluded bool
		for _, excludeCluster := range c.config().Sinker.ExcludeClusters {
			if excludeCluster == cluster {
				isClusterExcluded = true
				break
			}
		}
		if isClusterExcluded {
			log.Debugf("Cluster %q is excluded, skipping pods deletion.", cluster)
			continue
		}
		var pods corev1api.PodList
		if err := client.List(c.ctx, &pods, ctrlruntimeclient.MatchingLabels{kube.CreatedByProw: "true"}, ctrlruntimeclient.InNamespace(c.config().PodNamespace)); err != nil {
			log.WithError(err).Error("Error listing pods.")
			continue
		}
		log.WithField("pod-count", len(pods.Items)).Debug("Successfully listed pods.")
		metrics.podsCreated += len(pods.Items)
		maxPodAge := c.config().Sinker.MaxPodAge.Duration
		terminatedPodTTL := c.config().Sinker.TerminatedPodTTL.Duration
		for _, pod := range pods.Items {
			reason := ""
			clean := false

			// by default, use the pod name as the key to match the associated prow job
			// this is to support legacy plank in case the kube.ProwJobIDLabel label is not set
			podJobName := pod.ObjectMeta.Name
			// if the pod has the kube.ProwJobIDLabel label, use this instead of the pod name
			if value, ok := pod.ObjectMeta.Labels[kube.ProwJobIDLabel]; ok {
				podJobName = value
			}
			log = log.WithField("pj", podJobName)
			terminationTime := time.Time{}
			if pj, ok := pjMap[podJobName]; ok && pj.Complete() {
				terminationTime = pj.Status.CompletionTime.Time
			}

			if podNeedsKubernetesFinalizerCleanup(log, pjMap[podJobName], &pod) {
				if err := c.cleanupKubernetesFinalizer(&pod, client); err != nil {
					log.WithError(err).Error("Failed to remove kubernetesreporter finalizer")
				}
			}

			switch {
			case !pod.Status.StartTime.IsZero() && time.Since(pod.Status.StartTime.Time) > maxPodAge:
				clean = true
				reason = reasonPodAged
			case !terminationTime.IsZero() && time.Since(terminationTime) > terminatedPodTTL:
				clean = true
				reason = reasonPodTTLed
			}

			if !isFinished.Has(podJobName) {
				// prowjob exists and is not marked as completed yet
				// deleting the pod now will result in plank creating a brand new pod
				clean = false
			}

			if c.isPodOrphaned(log, &pod, podJobName) {
				// prowjob has gone, we want to clean orphan pods regardless of the state
				reason = reasonPodOrphaned
				clean = true
			}

			if !clean {
				continue
			}

			c.deletePod(log, &pod, reason, client, &metrics)
		}
	}

	metrics.finishedAt = time.Now()
	sinkerMetrics.podsCreated.Set(float64(metrics.podsCreated))
	sinkerMetrics.timeUsed.Set(float64(metrics.getTimeUsed().Seconds()))
	for k, v := range metrics.podsRemoved {
		sinkerMetrics.podsRemoved.WithLabelValues(k).Set(float64(v))
	}
	for k, v := range metrics.podRemovalErrors {
		sinkerMetrics.podRemovalErrors.WithLabelValues(k).Set(float64(v))
	}
	sinkerMetrics.prowJobsCreated.Set(float64(metrics.prowJobsCreated))
	for k, v := range metrics.prowJobsCleaned {
		sinkerMetrics.prowJobsCleaned.WithLabelValues(k).Set(float64(v))
	}
	for k, v := range metrics.prowJobsCleaningErrors {
		sinkerMetrics.prowJobsCleaningErrors.WithLabelValues(k).Set(float64(v))
	}
	version.GatherProwVersion(c.logger)
	c.logger.Info("Sinker reconciliation complete.")
}