pkg/helper/k8smeta/k8s_meta_manager.go (208 lines of code) (raw):

package k8smeta import ( "context" "fmt" "strings" "sync" "sync/atomic" "time" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" controllerConfig "sigs.k8s.io/controller-runtime/pkg/client/config" "github.com/alibaba/ilogtail/pkg/flags" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/selfmonitor" ) var metaManager *MetaManager var onceManager sync.Once type MetaCache interface { Get(key []string) map[string][]*ObjectWrapper GetSize() int GetQueueSize() int List() []*ObjectWrapper Filter(filterFunc func(*ObjectWrapper) bool, limit int) []*ObjectWrapper RegisterSendFunc(key string, sendFunc SendFunc, interval int) UnRegisterSendFunc(key string) init(*kubernetes.Clientset) watch(stopCh <-chan struct{}) } type FlushCh struct { Ch chan *K8sMetaEvent ConfigName string } type MetaManager struct { clientset *kubernetes.Clientset stopCh chan struct{} ready atomic.Bool metadataHandler *metadataHandler cacheMap map[string]MetaCache linkGenerator *LinkGenerator linkRegisterMap map[string][]string registerLock sync.RWMutex // self metrics projectNames map[string]int metricRecord selfmonitor.MetricsRecord addEventCount selfmonitor.CounterMetric updateEventCount selfmonitor.CounterMetric deleteEventCount selfmonitor.CounterMetric cacheResourceGauge selfmonitor.GaugeMetric queueSizeGauge selfmonitor.GaugeMetric httpRequestCount selfmonitor.CounterMetric httpAvgDelayMs selfmonitor.CounterMetric httpMaxDelayMs selfmonitor.GaugeMetric } func GetMetaManagerInstance() *MetaManager { onceManager.Do(func() { metaManager = &MetaManager{ stopCh: make(chan struct{}), } metaManager.metadataHandler = newMetadataHandler(metaManager) metaManager.cacheMap = make(map[string]MetaCache) for _, resource := range AllResources { metaManager.cacheMap[resource] = newK8sMetaCache(metaManager.stopCh, resource) } metaManager.linkGenerator = NewK8sMetaLinkGenerator(metaManager.cacheMap) metaManager.linkRegisterMap = make(map[string][]string) metaManager.projectNames = make(map[string]int) }) return metaManager } func (m *MetaManager) Init(configPath string) (err error) { var config *rest.Config if len(configPath) > 0 { config, err = clientcmd.BuildConfigFromFlags("", configPath) if err != nil { return err } } else { // 创建 Kubernetes 客户端配置 config = controllerConfig.GetConfigOrDie() } // 创建 Kubernetes 客户端 clientset, err := kubernetes.NewForConfig(config) if err != nil { return err } m.clientset = clientset m.metricRecord = selfmonitor.MetricsRecord{} m.addEventCount = selfmonitor.NewCounterMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaAddEventTotal) m.updateEventCount = selfmonitor.NewCounterMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaUpdateEventTotal) m.deleteEventCount = selfmonitor.NewCounterMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaDeleteEventTotal) m.cacheResourceGauge = selfmonitor.NewGaugeMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaCacheSize) m.queueSizeGauge = selfmonitor.NewGaugeMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaQueueSize) m.httpRequestCount = selfmonitor.NewCounterMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaHTTPRequestTotal) m.httpAvgDelayMs = selfmonitor.NewAverageMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaHTTPAvgDelayMs) m.httpMaxDelayMs = selfmonitor.NewMaxMetricAndRegister(&m.metricRecord, selfmonitor.MetricRunnerK8sMetaHTTPMaxDelayMs) go func() { startTime := time.Now() for _, cache := range m.cacheMap { cache.init(clientset) } m.ready.Store(true) logger.Info(context.Background(), "init k8s meta manager", "success", "latancy (ms)", fmt.Sprintf("%d", time.Since(startTime).Milliseconds())) }() return nil } func (m *MetaManager) Run(stopCh chan struct{}) { m.stopCh = stopCh m.runServer() } func (m *MetaManager) IsReady() bool { return m.ready.Load() } func (m *MetaManager) RegisterSendFunc(projectName, configName, resourceType string, sendFunc SendFunc, interval int) { if cache, ok := m.cacheMap[resourceType]; ok { cache.RegisterSendFunc(configName, func(events []*K8sMetaEvent) { sendFunc(events) m.registerLock.RLock() for _, linkType := range m.linkRegisterMap[configName] { if strings.HasPrefix(linkType, resourceType) { linkEvents := m.linkGenerator.GenerateLinks(events, linkType) if linkEvents != nil { sendFunc(linkEvents) } } } m.registerLock.RUnlock() }, interval) m.registerLock.Lock() if cnt, ok := m.projectNames[projectName]; ok { m.projectNames[projectName] = cnt + 1 } else { m.projectNames[projectName] = 1 } m.registerLock.Unlock() return } // register link if !isEntity(resourceType) { m.registerLock.Lock() if _, ok := m.linkRegisterMap[configName]; !ok { m.linkRegisterMap[configName] = make([]string, 0) } m.linkRegisterMap[configName] = append(m.linkRegisterMap[configName], resourceType) m.registerLock.Unlock() } else { logger.Error(context.Background(), "ENTITY_PIPELINE_REGISTER_ERROR", "resourceType not support", resourceType) } } func (m *MetaManager) UnRegisterAllSendFunc(projectName, configName string) { for _, cache := range m.cacheMap { cache.UnRegisterSendFunc(configName) } m.registerLock.Lock() if cnt, ok := m.projectNames[projectName]; ok { if cnt == 1 { delete(m.projectNames, projectName) } else { m.projectNames[projectName] = cnt - 1 } } delete(m.linkRegisterMap, configName) m.registerLock.Unlock() } func GetMetaManagerMetrics() []map[string]string { manager := GetMetaManagerInstance() if manager == nil || !manager.IsReady() { return nil } // cache queueSize := 0 cacheSize := 0 for _, cache := range manager.cacheMap { queueSize += cache.GetQueueSize() cacheSize += cache.GetSize() } manager.queueSizeGauge.Set(float64(queueSize)) manager.cacheResourceGauge.Set(float64(cacheSize)) // set labels manager.registerLock.RLock() projectName := make([]string, 0) projectName = append(projectName, *flags.DefaultLogProject) for name := range manager.projectNames { projectName = append(projectName, name) } manager.registerLock.RUnlock() manager.metricRecord.Labels = []selfmonitor.LabelPair{ { Key: selfmonitor.MetricLabelKeyMetricCategory, Value: selfmonitor.MetricLabelValueMetricCategoryRunner, }, { Key: selfmonitor.MetricLabelKeyClusterID, Value: *flags.ClusterID, }, { Key: selfmonitor.MetricLabelKeyRunnerName, Value: selfmonitor.MetricLabelValueRunnerNameK8sMeta, }, { Key: selfmonitor.MetricLabelKeyProject, Value: strings.Join(projectName, " "), }, } return []map[string]string{ manager.metricRecord.ExportMetricRecords(), } } func (m *MetaManager) runServer() { go m.metadataHandler.K8sServerRun(m.stopCh) } func isEntity(resourceType string) bool { return !strings.Contains(resourceType, LINK_SPLIT_CHARACTER) }