plugins/input/kubernetesmetav2/service_meta.go (110 lines of code) (raw):

package kubernetesmetav2 import ( "github.com/alibaba/ilogtail/pkg/flags" "github.com/alibaba/ilogtail/pkg/helper/k8smeta" "github.com/alibaba/ilogtail/pkg/models" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/selfmonitor" ) type ProcessFunc func(data *k8smeta.ObjectWrapper, method string) []models.PipelineEvent //revive:disable:exported type ServiceK8sMeta struct { //revive:enable:exported Interval int // entity switch Pod bool Node bool Service bool Deployment bool ReplicaSet bool DaemonSet bool StatefulSet bool Configmap bool Job bool CronJob bool Namespace bool PersistentVolume bool PersistentVolumeClaim bool StorageClass bool Ingress bool Container bool // link switch Node2Pod string Deployment2Pod string ReplicaSet2Pod string Deployment2ReplicaSet string StatefulSet2Pod string DaemonSet2Pod string Service2Pod string Pod2Container string CronJob2Job string Job2Pod string Ingress2Service string Pod2PersistentVolumeClaim string Pod2ConfigMap string // add link for namesapce Namespace2Pod string Namespace2Service string Namespace2Deployment string Namespace2DaemonSet string Namespace2StatefulSet string Namespace2Configmap string Namespace2Job string Namespace2CronJob string Namespace2PersistentVolume string Namespace2PersistentVolumeClaim string Namespace2StorageClass string Namespace2Ingress string // restrict cluster link dest target Cluster2Node string Cluster2Namespace string Cluster2PersistentVolume string Cluster2StorageClass string // other context pipeline.Context metaManager *k8smeta.MetaManager collector pipeline.Collector metaCollector *metaCollector configName string clusterID string domain string // self metric entityCount selfmonitor.CounterMetric linkCount selfmonitor.CounterMetric } // Init called for init some system resources, like socket, mutex... // return interval(ms) and error flag, if interval is 0, use default interval func (s *ServiceK8sMeta) Init(context pipeline.Context) (int, error) { s.context = context s.metaManager = k8smeta.GetMetaManagerInstance() s.configName = context.GetConfigName() s.initDomain() metricRecord := s.context.GetMetricRecord() s.entityCount = selfmonitor.NewCounterMetricAndRegister(metricRecord, selfmonitor.MetricCollectEntityTotal) s.linkCount = selfmonitor.NewCounterMetricAndRegister(metricRecord, selfmonitor.MetricCollectLinkTotal) return 0, nil } // Description returns a one-sentence description on the Input func (s *ServiceK8sMeta) Description() string { return "" } // Stop stops the services and closes any necessary channels and connections func (s *ServiceK8sMeta) Stop() error { return s.metaCollector.Stop() } func (s *ServiceK8sMeta) Start(collector pipeline.Collector) error { s.collector = collector s.metaCollector = &metaCollector{ serviceK8sMeta: s, collector: collector, entityBuffer: make(chan models.PipelineEvent, 100), entityLinkBuffer: make(chan models.PipelineEvent, 100), stopCh: make(chan struct{}), entityProcessor: make(map[string]ProcessFunc), } return s.metaCollector.Start() } func (s *ServiceK8sMeta) initDomain() { switch *flags.ClusterType { case ackCluster, oneCluster, asiCluster: s.domain = acsDomain default: s.domain = infraDomain } } func init() { pipeline.ServiceInputs["service_kubernetes_meta"] = func() pipeline.ServiceInput { return &ServiceK8sMeta{ Interval: 60, clusterID: *flags.ClusterID, } } }