in pkg/controller/remotecluster/controller.go [137:324]
func doReconcile(
ctx context.Context,
r *ReconcileRemoteClusters,
remoteServer *esv1.Elasticsearch,
) (reconcile.Result, error) {
log := ulog.FromContext(ctx)
remoteServerKey := k8s.ExtractNamespacedName(remoteServer)
expectedRemoteClients, err := getExpectedRemoteClientsFor(ctx, r.Client, remoteServer)
if err != nil {
return reconcile.Result{}, err
}
enabled, err := r.licenseChecker.EnterpriseFeaturesEnabled(ctx)
if err != nil {
return defaultRequeue, err
}
if !enabled && len(expectedRemoteClients) > 0 {
log.V(1).Info(
"Remote cluster controller is an enterprise feature. Enterprise features are disabled",
"namespace", remoteServer.Namespace, "es_name", remoteServer.Name,
)
return reconcile.Result{}, nil
}
// Get all the clusters to which this reconciled cluster is connected to according to the existing remote CAs.
// associatedRemoteCAs is used to delete the CA certificates and cancel any trust relationships
// that may have existed in the past but should not exist anymore.
associatedRemoteCAs, err := getAssociatedRemoteCAs(ctx, r.Client, remoteServerKey)
if err != nil {
return reconcile.Result{}, err
}
var (
activeAPIKeys esclient.CrossClusterAPIKeyList
esClient esclient.Client
)
remoteServerSupportsClusterAPIKeys, err := remoteServer.SupportsRemoteClusterAPIKeys()
if err != nil {
return reconcile.Result{}, err
}
results := &reconciler.Results{}
if remoteServerSupportsClusterAPIKeys.IsTrue() {
// Check if the ES API is available. We need it to create, update and invalidate
// API keys in this cluster.
if !services.NewElasticsearchURLProvider(*remoteServer, r.Client).HasEndpoints() {
log.Info("Elasticsearch API is not available yet")
return results.WithResult(defaultRequeue).Aggregate()
}
// Create a new client
newEsClient, err := r.esClientProvider(ctx, r.Client, r.Dialer, *remoteServer)
if err != nil {
return reconcile.Result{}, err
}
// Check that the API is available
esClient = newEsClient
// Get all the API Keys, for that specific client, on the reconciled cluster.
crossClusterAPIKeys, err := esClient.GetCrossClusterAPIKeys(ctx, "eck-*")
if err != nil {
return reconcile.Result{}, err
}
activeAPIKeys = crossClusterAPIKeys
}
// apiKeyReconciledRemoteClients is used to track all the client clusters for which API keys have already been reconciled.
// This is used to garbage collect API keys for clusters which have been deleted and are not in expectedRemoteClusters.
apiKeyReconciledRemoteClients := sets.New[types.NamespacedName]()
// Main loop to:
// 1. Create or update expected remote CA.
// 2. Create or update API keys and keystores.
for remoteClientKey, remoteClusterRefs := range expectedRemoteClients {
// Get the remote/client Elasticsearch cluster associated with this local/reconciled cluster.
remoteClient := &esv1.Elasticsearch{}
if err := r.Client.Get(ctx, remoteClientKey, remoteClient); err != nil {
if errors.IsNotFound(err) {
// Remote client cluster does not exist, invalidate API keys for that client cluster.
apiKeyReconciledRemoteClients.Insert(remoteClientKey)
results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, nil, esClient, r.keystoreProvider))
continue
}
return reconcile.Result{}, err
}
log := log.WithValues(
"remote_server_namespace", remoteServer.Namespace,
"remote_server", remoteServer.Name,
"remote_client_namespace", remoteClient.Namespace,
"remote_client_name", remoteClient.Name,
)
accessAllowed, err := isRemoteClusterAssociationAllowed(ctx, r.accessReviewer, remoteServer, remoteClient, r.recorder)
if err != nil {
return reconcile.Result{}, err
}
// if the remote CA exists but isn't allowed anymore, it will be deleted next
if !accessAllowed {
// Remove from the expected remote cluster to clean up local keystore.
delete(expectedRemoteClients, remoteClientKey)
// Invalidate API keys for that client cluster.
apiKeyReconciledRemoteClients.Insert(remoteClientKey)
results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, nil, esClient, r.keystoreProvider))
continue
}
delete(associatedRemoteCAs, remoteClientKey)
results.WithResults(createOrUpdateCertificateAuthorities(ctx, r, remoteServer, remoteClient))
if results.HasError() {
return results.Aggregate()
}
// RCS2, first check that both the reconciled and the client clusters are compatible.
clientClusterSupportsClusterAPIKeys, err := remoteClient.SupportsRemoteClusterAPIKeys()
if err != nil {
results.WithError(err)
continue
}
if !clientClusterSupportsClusterAPIKeys.IsSet() {
log.Info("Client cluster version is not available in status yet, skipping API keys reconciliation")
continue
}
if !remoteServerSupportsClusterAPIKeys.IsSet() {
log.Info("Cluster version is not available in status yet, skipping API keys reconciliation")
continue
}
if clientClusterSupportsClusterAPIKeys.IsFalse() && remoteServerSupportsClusterAPIKeys.IsTrue() {
err := fmt.Errorf("client cluster %s/%s is running version %s which does not support remote cluster keys", remoteClient.Namespace, remoteClient.Name, remoteClient.Spec.Version)
log.Error(err, "cannot configure remote cluster settings")
continue
}
// Reconcile the API Keys.
apiKeyReconciledRemoteClients.Insert(remoteClientKey)
results.WithResults(reconcileAPIKeys(ctx, r.Client, activeAPIKeys, remoteServer, remoteClient, remoteClusterRefs, esClient, r.keystoreProvider))
}
if remoteServerSupportsClusterAPIKeys.IsTrue() {
// **************************************************************
// Delete orphaned API keys from clusters which have been deleted
// **************************************************************
for _, activeAPIKey := range activeAPIKeys.APIKeys {
clientCluster, err := activeAPIKey.GetElasticsearchName()
if err != nil {
results.WithError(err)
continue
}
if _, exists := apiKeyReconciledRemoteClients[clientCluster]; exists {
// API keys for that client cluster have already been reconciled, skip.
continue
}
// This API key in the local cluster state belongs to an unknown cluster which is not expected and has not been reconciled.
log.Info(fmt.Sprintf("Invalidating API key %s which belongs to unknown cluster %s", activeAPIKey.Name, clientCluster))
results.WithError(esClient.InvalidateCrossClusterAPIKey(ctx, activeAPIKey.Name))
}
// *********************************************
// Delete unexpected keys in the local keystore.
// *********************************************
expectedAliases := expectedAliases(remoteServer, expectedRemoteClients)
apiKeyStore, err := r.keystoreProvider.ForCluster(ctx, log, remoteServer)
if err != nil {
return results.WithError(err).Aggregate()
}
for alias := range apiKeyStore.GetAliases() {
if expectedAliases.Has(alias) {
// Expected alias
continue
}
// Unexpected
log.Info(fmt.Sprintf("Removing unexpected remote API key %s", alias))
apiKeyStore.Delete(alias)
}
results.WithResults(apiKeyStore.Save(ctx, r.Client, remoteServer))
}
// Delete existing but not expected remote CA
for toDelete := range associatedRemoteCAs {
log.V(1).Info("Deleting remote CA",
"local_namespace", remoteServer.Namespace,
"local_name", remoteServer.Name,
"remote_namespace", toDelete.Namespace,
"remote_name", toDelete.Name,
)
results.WithError(deleteCertificateAuthorities(ctx, r, remoteServerKey, toDelete))
}
return results.WithResult(association.RequeueRbacCheck(r.accessReviewer)).Aggregate()
}