func doReconcile()

in pkg/controller/remotecluster/controller.go [137:324]


func doReconcile(
	ctx context.Context,
	r *ReconcileRemoteClusters,
	remoteServer *esv1.Elasticsearch,
) (reconcile.Result, error) {
	log := ulog.FromContext(ctx)

	remoteServerKey := k8s.ExtractNamespacedName(remoteServer)

	expectedRemoteClients, err := getExpectedRemoteClientsFor(ctx, r.Client, remoteServer)
	if err != nil {
		return reconcile.Result{}, err
	}

	enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled(ctx)
	if err != nil {
		return defaultRequeue, err
	}
	if !enabled && len(expectedRemoteClients) > 0 {
		log.V(1).Info(
			"Remote cluster controller is an enterprise feature. Enterprise features are disabled",
			"namespace", remoteServer.Namespace, "es_name", remoteServer.Name,
		)
		return reconcile.Result{}, nil
	}

	// Get all the clusters to which this reconciled cluster is connected to according to the existing remote CAs.
	// associatedRemoteCAs is used to delete the CA certificates and cancel any trust relationships
	// that may have existed in the past but should not exist anymore.
	associatedRemoteCAs, err := getAssociatedRemoteCAs(ctx, r.Client, remoteServerKey)
	if err != nil {
		return reconcile.Result{}, err
	}

	var (
		activeAPIKeys esclient.CrossClusterAPIKeyList
		esClient      esclient.Client
	)
	remoteServerSupportsClusterAPIKeys, err := remoteServer.SupportsRemoteClusterAPIKeys()
	if err != nil {
		return reconcile.Result{}, err
	}
	results := &reconciler.Results{}
	if remoteServerSupportsClusterAPIKeys.IsTrue() {
		// Check if the ES API is available. We need it to create, update and invalidate
		// API keys in this cluster.
		if !services.NewElasticsearchURLProvider(*remoteServer, r.Client).HasEndpoints() {
			log.Info("Elasticsearch API is not available yet")
			return results.WithResult(defaultRequeue).Aggregate()
		}
		// Create a new client
		newEsClient, err := r.esClientProvider(ctx, r.Client, r.Dialer, *remoteServer)
		if err != nil {
			return reconcile.Result{}, err
		}
		// Check that the API is available
		esClient = newEsClient
		// Get all the API Keys, for that specific client, on the reconciled cluster.
		crossClusterAPIKeys, err := esClient.GetCrossClusterAPIKeys(ctx, "eck-*")
		if err != nil {
			return reconcile.Result{}, err
		}
		activeAPIKeys = crossClusterAPIKeys
	}

	// apiKeyReconciledRemoteClients is used to track all the client clusters for which API keys have already been reconciled.
	// This is used to garbage collect API keys for clusters which have been deleted and are not in expectedRemoteClusters.
	apiKeyReconciledRemoteClients := sets.New[types.NamespacedName]()

	// Main loop to:
	// 1. Create or update expected remote CA.
	// 2. Create or update API keys and keystores.
	for remoteClientKey, remoteClusterRefs := range expectedRemoteClients {
		// Get the remote/client Elasticsearch cluster associated with this local/reconciled cluster.
		remoteClient := &esv1.Elasticsearch{}
		if err := r.Client.Get(ctx, remoteClientKey, remoteClient); err != nil {
			if errors.IsNotFound(err) {
				// Remote client cluster does not exist, invalidate API keys for that client cluster.
				apiKeyReconciledRemoteClients.Insert(remoteClientKey)
				results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, nil, esClient, r.keystoreProvider))
				continue
			}
			return reconcile.Result{}, err
		}
		log := log.WithValues(
			"remote_server_namespace", remoteServer.Namespace,
			"remote_server", remoteServer.Name,
			"remote_client_namespace", remoteClient.Namespace,
			"remote_client_name", remoteClient.Name,
		)
		accessAllowed, err := isRemoteClusterAssociationAllowed(ctx, r.accessReviewer, remoteServer, remoteClient, r.recorder)
		if err != nil {
			return reconcile.Result{}, err
		}
		// if the remote CA exists but isn't allowed anymore, it will be deleted next
		if !accessAllowed {
			// Remove from the expected remote cluster to clean up local keystore.
			delete(expectedRemoteClients, remoteClientKey)
			// Invalidate API keys for that client cluster.
			apiKeyReconciledRemoteClients.Insert(remoteClientKey)
			results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, nil, esClient, r.keystoreProvider))
			continue
		}
		delete(associatedRemoteCAs, remoteClientKey)
		results.WithResults(createOrUpdateCertificateAuthorities(ctx, r, remoteServer, remoteClient))
		if results.HasError() {
			return results.Aggregate()
		}

		// RCS2, first check that both the reconciled and the client clusters are compatible.
		clientClusterSupportsClusterAPIKeys, err := remoteClient.SupportsRemoteClusterAPIKeys()
		if err != nil {
			results.WithError(err)
			continue
		}

		if !clientClusterSupportsClusterAPIKeys.IsSet() {
			log.Info("Client cluster version is not available in status yet, skipping API keys reconciliation")
			continue
		}

		if !remoteServerSupportsClusterAPIKeys.IsSet() {
			log.Info("Cluster version is not available in status yet, skipping API keys reconciliation")
			continue
		}

		if clientClusterSupportsClusterAPIKeys.IsFalse() && remoteServerSupportsClusterAPIKeys.IsTrue() {
			err := fmt.Errorf("client cluster %s/%s is running version %s which does not support remote cluster keys", remoteClient.Namespace, remoteClient.Name, remoteClient.Spec.Version)
			log.Error(err, "cannot configure remote cluster settings")
			continue
		}
		// Reconcile the API Keys.
		apiKeyReconciledRemoteClients.Insert(remoteClientKey)
		results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, remoteClusterRefs, esClient, r.keystoreProvider))
	}

	if remoteServerSupportsClusterAPIKeys.IsTrue() {
		// **************************************************************
		// Delete orphaned API keys from clusters which have been deleted
		// **************************************************************
		for _, activeAPIKey := range activeAPIKeys.APIKeys {
			clientCluster, err := activeAPIKey.GetElasticsearchName()
			if err != nil {
				results.WithError(err)
				continue
			}
			if _, exists := apiKeyReconciledRemoteClients[clientCluster]; exists {
				// API keys for that client cluster have already been reconciled, skip.
				continue
			}
			// This API key in the local cluster state belongs to an unknown cluster which is not expected and has not been reconciled.
			log.Info(fmt.Sprintf("Invalidating API key %s which belongs to unknown cluster %s", activeAPIKey.Name, clientCluster))
			results.WithError(esClient.InvalidateCrossClusterAPIKey(ctx, activeAPIKey.Name))
		}

		// *********************************************
		// Delete unexpected keys in the local keystore.
		// *********************************************
		expectedAliases := expectedAliases(remoteServer, expectedRemoteClients)
		apiKeyStore, err := r.keystoreProvider.ForCluster(ctx, log, remoteServer)
		if err != nil {
			return results.WithError(err).Aggregate()
		}

		for alias := range apiKeyStore.GetAliases() {
			if expectedAliases.Has(alias) {
				// Expected alias
				continue
			}
			// Unexpected
			log.Info(fmt.Sprintf("Removing unexpected remote API key %s", alias))
			apiKeyStore.Delete(alias)
		}
		results.WithResults(apiKeyStore.Save(ctx, r.Client, remoteServer))
	}

	// Delete existing but not expected remote CA
	for toDelete := range associatedRemoteCAs {
		log.V(1).Info("Deleting remote CA",
			"local_namespace", remoteServer.Namespace,
			"local_name", remoteServer.Name,
			"remote_namespace", toDelete.Namespace,
			"remote_name", toDelete.Name,
		)
		results.WithError(deleteCertificateAuthorities(ctx, r, remoteServerKey, toDelete))
	}
	return results.WithResult(association.RequeueRbacCheck(r.accessReviewer)).Aggregate()
}