in prow/cmd/sinker/main.go [289:462]
func (c *controller) clean() {
metrics := sinkerReconciliationMetrics{
startAt: time.Now(),
podsRemoved: map[string]int{},
podRemovalErrors: map[string]int{},
prowJobsCleaned: map[string]int{},
prowJobsCleaningErrors: map[string]int{}}
// Clean up old prow jobs first.
prowJobs := &prowapi.ProwJobList{}
if err := c.prowJobClient.List(c.ctx, prowJobs, ctrlruntimeclient.InNamespace(c.config().ProwJobNamespace)); err != nil {
c.logger.WithError(err).Error("Error listing prow jobs.")
return
}
metrics.prowJobsCreated = len(prowJobs.Items)
// Only delete pod if its prowjob is marked as finished
pjMap := map[string]*prowapi.ProwJob{}
isFinished := sets.NewString()
maxProwJobAge := c.config().Sinker.MaxProwJobAge.Duration
for i, prowJob := range prowJobs.Items {
pjMap[prowJob.ObjectMeta.Name] = &prowJobs.Items[i]
// Handle periodics separately.
if prowJob.Spec.Type == prowapi.PeriodicJob {
continue
}
if !prowJob.Complete() {
continue
}
isFinished.Insert(prowJob.ObjectMeta.Name)
if time.Since(prowJob.Status.StartTime.Time) <= maxProwJobAge {
continue
}
if err := c.prowJobClient.Delete(c.ctx, &prowJob); err == nil {
c.logger.WithFields(pjutil.ProwJobFields(&prowJob)).Info("Deleted prowjob.")
metrics.prowJobsCleaned[reasonProwJobAged]++
} else {
c.logger.WithFields(pjutil.ProwJobFields(&prowJob)).WithError(err).Error("Error deleting prowjob.")
metrics.prowJobsCleaningErrors[string(k8serrors.ReasonForError(err))]++
}
}
// Keep track of what periodic jobs are in the config so we will
// not clean up their last prowjob.
isActivePeriodic := make(map[string]bool)
for _, p := range c.config().Periodics {
isActivePeriodic[p.Name] = true
}
// Get the jobs that we need to retain so horologium can continue working
// as intended.
latestPeriodics := pjutil.GetLatestProwJobs(prowJobs.Items, prowapi.PeriodicJob)
for _, prowJob := range prowJobs.Items {
if prowJob.Spec.Type != prowapi.PeriodicJob {
continue
}
latestPJ := latestPeriodics[prowJob.Spec.Job]
if isActivePeriodic[prowJob.Spec.Job] && prowJob.ObjectMeta.Name == latestPJ.ObjectMeta.Name {
// Ignore deleting this one.
continue
}
if !prowJob.Complete() {
continue
}
isFinished.Insert(prowJob.ObjectMeta.Name)
if time.Since(prowJob.Status.StartTime.Time) <= maxProwJobAge {
continue
}
if err := c.prowJobClient.Delete(c.ctx, &prowJob); err == nil {
c.logger.WithFields(pjutil.ProwJobFields(&prowJob)).Info("Deleted prowjob.")
metrics.prowJobsCleaned[reasonProwJobAgedPeriodic]++
} else {
c.logger.WithFields(pjutil.ProwJobFields(&prowJob)).WithError(err).Error("Error deleting prowjob.")
metrics.prowJobsCleaningErrors[string(k8serrors.ReasonForError(err))]++
}
}
// Now clean up old pods.
for cluster, client := range c.podClients {
log := c.logger.WithField("cluster", cluster)
var isClusterExcluded bool
for _, excludeCluster := range c.config().Sinker.ExcludeClusters {
if excludeCluster == cluster {
isClusterExcluded = true
break
}
}
if isClusterExcluded {
log.Debugf("Cluster %q is excluded, skipping pods deletion.", cluster)
continue
}
var pods corev1api.PodList
if err := client.List(c.ctx, &pods, ctrlruntimeclient.MatchingLabels{kube.CreatedByProw: "true"}, ctrlruntimeclient.InNamespace(c.config().PodNamespace)); err != nil {
log.WithError(err).Error("Error listing pods.")
continue
}
log.WithField("pod-count", len(pods.Items)).Debug("Successfully listed pods.")
metrics.podsCreated += len(pods.Items)
maxPodAge := c.config().Sinker.MaxPodAge.Duration
terminatedPodTTL := c.config().Sinker.TerminatedPodTTL.Duration
for _, pod := range pods.Items {
reason := ""
clean := false
// by default, use the pod name as the key to match the associated prow job
// this is to support legacy plank in case the kube.ProwJobIDLabel label is not set
podJobName := pod.ObjectMeta.Name
// if the pod has the kube.ProwJobIDLabel label, use this instead of the pod name
if value, ok := pod.ObjectMeta.Labels[kube.ProwJobIDLabel]; ok {
podJobName = value
}
log = log.WithField("pj", podJobName)
terminationTime := time.Time{}
if pj, ok := pjMap[podJobName]; ok && pj.Complete() {
terminationTime = pj.Status.CompletionTime.Time
}
if podNeedsKubernetesFinalizerCleanup(log, pjMap[podJobName], &pod) {
if err := c.cleanupKubernetesFinalizer(&pod, client); err != nil {
log.WithError(err).Error("Failed to remove kubernetesreporter finalizer")
}
}
switch {
case !pod.Status.StartTime.IsZero() && time.Since(pod.Status.StartTime.Time) > maxPodAge:
clean = true
reason = reasonPodAged
case !terminationTime.IsZero() && time.Since(terminationTime) > terminatedPodTTL:
clean = true
reason = reasonPodTTLed
}
if !isFinished.Has(podJobName) {
// prowjob exists and is not marked as completed yet
// deleting the pod now will result in plank creating a brand new pod
clean = false
}
if c.isPodOrphaned(log, &pod, podJobName) {
// prowjob has gone, we want to clean orphan pods regardless of the state
reason = reasonPodOrphaned
clean = true
}
if !clean {
continue
}
c.deletePod(log, &pod, reason, client, &metrics)
}
}
metrics.finishedAt = time.Now()
sinkerMetrics.podsCreated.Set(float64(metrics.podsCreated))
sinkerMetrics.timeUsed.Set(float64(metrics.getTimeUsed().Seconds()))
for k, v := range metrics.podsRemoved {
sinkerMetrics.podsRemoved.WithLabelValues(k).Set(float64(v))
}
for k, v := range metrics.podRemovalErrors {
sinkerMetrics.podRemovalErrors.WithLabelValues(k).Set(float64(v))
}
sinkerMetrics.prowJobsCreated.Set(float64(metrics.prowJobsCreated))
for k, v := range metrics.prowJobsCleaned {
sinkerMetrics.prowJobsCleaned.WithLabelValues(k).Set(float64(v))
}
for k, v := range metrics.prowJobsCleaningErrors {
sinkerMetrics.prowJobsCleaningErrors.WithLabelValues(k).Set(float64(v))
}
version.GatherProwVersion(c.logger)
c.logger.Info("Sinker reconciliation complete.")
}