func reconcileNodeSetTransportCertificatesSecrets()

in pkg/controller/elasticsearch/certificates/transport/reconcile.go [83:183]


func reconcileNodeSetTransportCertificatesSecrets(
	ctx context.Context,
	c k8s.Client,
	ca *certificates.CA,
	additionalCAs []byte,
	es esv1.Elasticsearch,
	ssetName string,
	rotationParams certificates.RotationParams,
) *reconciler.Results {
	results := &reconciler.Results{}
	log := ulog.FromContext(ctx)
	// List all the existing Pods in the nodeSet
	var pods corev1.PodList
	matchLabels := label.NewLabelSelectorForStatefulSetName(es.Name, ssetName)
	ns := client.InNamespace(es.Namespace)
	if err := c.List(ctx, &pods, matchLabels, ns); err != nil {
		return results.WithError(errors.WithStack(err))
	}

	secret, err := ensureTransportCertificatesSecretExists(ctx, c, es, ssetName)
	if err != nil {
		return results.WithError(err)
	}
	// defensive copy of the current secret so we can check whether we need to update later on
	currentTransportCertificatesSecret := secret.DeepCopy()
	for _, pod := range pods.Items {
		if pod.Status.PodIP == "" {
			log.Info("Skipping pod because it has no IP yet", "namespace", pod.Namespace, "pod_name", pod.Name)
			continue
		}

		if _, disabled := pod.Annotations[esv1.TransportCertDisabledAnnotationName]; disabled {
			delete(secret.Data, PodCertFileName(pod.Name))
			delete(secret.Data, PodKeyFileName(pod.Name))
			continue
		}

		if err := ensureTransportCertificatesSecretContentsForPod(
			ctx, es, secret, pod, ca, rotationParams,
		); err != nil {
			return results.WithError(err)
		}
		certCommonName := buildCertificateCommonName(pod, es)
		cert := extractTransportCert(ctx, *secret, pod, certCommonName)
		if cert == nil {
			return results.WithError(errors.New("no certificate found for pod"))
		}
		// handle cert expiry via requeue
		results.WithReconciliationState(
			reconciler.
				RequeueAfter(certificates.ShouldRotateIn(time.Now(), cert.NotAfter, rotationParams.RotateBefore)).
				ReconciliationComplete(),
		)
	}

	// remove certificates and keys for deleted pods
	podsByName := k8s.PodsByName(pods.Items)
	keysToPrune := make([]string, 0)
	for secretDataKey := range secret.Data {
		if secretDataKey == certificates.CAFileName {
			// never remove the CA file
			continue
		}

		// get the pod name from the secret key name (the first segment before the ".")
		podNameForKey := strings.SplitN(secretDataKey, ".", 2)[0]

		if _, ok := podsByName[podNameForKey]; !ok {
			// pod no longer exists, so the element is safe to delete.
			keysToPrune = append(keysToPrune, secretDataKey)
		}
	}
	if len(keysToPrune) > 0 {
		log.Info("Pruning keys from certificates secret", "namespace", es.Namespace, "secret_name", secret.Name, "keys", keysToPrune)

		for _, keyToRemove := range keysToPrune {
			delete(secret.Data, keyToRemove)
		}
	}

	if es.Spec.Transport.TLS.SelfSignedEnabled() {
		delete(secret.Data, disabledMarker)
	} else {
		// add a marker but leave all the old certs that might exist in the secret in place to ease the transition
		// to the disabled state.
		secret.Data[disabledMarker] = []byte("true") // contents is irrelevant
	}

	mayBeUpdateCAFile(secret, ca, additionalCAs)

	if !reflect.DeepEqual(secret, currentTransportCertificatesSecret) {
		if err := c.Update(ctx, secret); err != nil {
			return results.WithError(err)
		}
		for _, pod := range pods.Items {
			annotation.MarkPodAsUpdated(ctx, c, pod)
		}
	}

	return results
}