pkg/controller/remotecluster/keystore/provider.go (71 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package keystore import ( "context" "sync" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/types" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ) type pendingChangesPerCluster struct { pendingChangesPerCluster map[types.NamespacedName]*pendingChanges mu sync.RWMutex } func NewProvider(c k8s.Client) *Provider { return &Provider{ c: c, pendingChangesPerCluster: pendingChangesPerCluster{ pendingChangesPerCluster: make(map[types.NamespacedName]*pendingChanges), }, } } type Provider struct { c k8s.Client pendingChangesPerCluster pendingChangesPerCluster } func (p *Provider) ForgetCluster(name types.NamespacedName) { if p == nil { return } p.pendingChangesPerCluster.mu.Lock() defer p.pendingChangesPerCluster.mu.Unlock() delete(p.pendingChangesPerCluster.pendingChangesPerCluster, name) } func (p *Provider) ForCluster(ctx context.Context, log logr.Logger, owner *esv1.Elasticsearch) (*APIKeyStore, error) { if p == nil { return nil, nil } name := types.NamespacedName{ Namespace: owner.Namespace, Name: owner.Name, } pendingChanges := p.forCluster(name) if pendingChanges != nil { return loadAPIKeyStore(ctx, log, p.c, owner, pendingChanges) } return loadAPIKeyStore(ctx, log, p.c, owner, p.newForCluster(name)) } func (p *Provider) forCluster(name types.NamespacedName) *pendingChanges { if p == nil { return nil } p.pendingChangesPerCluster.mu.RLock() defer p.pendingChangesPerCluster.mu.RUnlock() return p.pendingChangesPerCluster.pendingChangesPerCluster[name] } func (p *Provider) newForCluster(name types.NamespacedName) *pendingChanges { if p == nil { return nil } p.pendingChangesPerCluster.mu.Lock() defer p.pendingChangesPerCluster.mu.Unlock() // Check if another goroutine did not create the pending changes currentPendingChanges := p.pendingChangesPerCluster.pendingChangesPerCluster[name] if currentPendingChanges != nil { return currentPendingChanges } newPendingChanges := &pendingChanges{ changes: make(map[string]pendingChange), } p.pendingChangesPerCluster.pendingChangesPerCluster[name] = newPendingChanges return newPendingChanges }