internal/k8sCommon/k8sclient/servicetoworkload.go (66 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package k8sclient
import (
"sync"
"time"
mapset "github.com/deckarep/golang-set/v2"
"go.uber.org/zap"
)
type ServiceToWorkloadMapper struct {
serviceAndNamespaceToSelectors *sync.Map
workloadAndNamespaceToLabels *sync.Map
serviceToWorkload *sync.Map
logger *zap.Logger
deleter Deleter
}
func NewServiceToWorkloadMapper(serviceAndNamespaceToSelectors, workloadAndNamespaceToLabels, serviceToWorkload *sync.Map, logger *zap.Logger, deleter Deleter) *ServiceToWorkloadMapper {
return &ServiceToWorkloadMapper{
serviceAndNamespaceToSelectors: serviceAndNamespaceToSelectors,
workloadAndNamespaceToLabels: workloadAndNamespaceToLabels,
serviceToWorkload: serviceToWorkload,
logger: logger,
deleter: deleter,
}
}
func (m *ServiceToWorkloadMapper) mapServiceToWorkload() {
m.logger.Debug("Map service to workload at:", zap.Time("time", time.Now()))
m.serviceAndNamespaceToSelectors.Range(func(key, value interface{}) bool {
var workloads []string
serviceAndNamespace := key.(string)
_, serviceNamespace := ExtractResourceAndNamespace(serviceAndNamespace)
serviceLabels := value.(mapset.Set[string])
m.workloadAndNamespaceToLabels.Range(func(workloadKey, labelsValue interface{}) bool {
labels := labelsValue.(mapset.Set[string])
workloadAndNamespace := workloadKey.(string)
_, workloadNamespace := ExtractResourceAndNamespace(workloadAndNamespace)
if workloadNamespace == serviceNamespace && workloadNamespace != "" && serviceLabels.IsSubset(labels) {
m.logger.Debug("Found workload for service", zap.String("service", serviceAndNamespace), zap.String("workload", workloadAndNamespace))
workloads = append(workloads, workloadAndNamespace)
}
return true
})
if len(workloads) > 1 {
m.logger.Info("Multiple workloads found for service. You will get unexpected results.", zap.String("service", serviceAndNamespace), zap.Strings("workloads", workloads))
} else if len(workloads) == 1 {
m.serviceToWorkload.Store(serviceAndNamespace, workloads[0])
} else {
m.logger.Debug("No workload found for service", zap.String("service", serviceAndNamespace))
m.deleter.DeleteWithDelay(m.serviceToWorkload, serviceAndNamespace)
}
return true
})
}
func (m *ServiceToWorkloadMapper) Start(stopCh chan struct{}) {
// do the first mapping immediately
m.mapServiceToWorkload()
m.logger.Debug("First-time map service to workload at:", zap.Time("time", time.Now()))
go func() {
for {
select {
case <-stopCh:
return
case <-time.After(time.Minute + 30*time.Second):
m.mapServiceToWorkload()
m.logger.Debug("Map service to workload at:", zap.Time("time", time.Now()))
}
}
}()
}