plugins/input/kubernetesmetav2/meta_collector.go (422 lines of code) (raw):
package kubernetesmetav2
import (
"context"
"encoding/json"
// #nosec G501
"crypto/md5"
"fmt"
"strconv"
"strings"
"time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/alibaba/ilogtail/pkg/flags"
"github.com/alibaba/ilogtail/pkg/helper/k8smeta"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/models"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
)
type metaCollector struct {
serviceK8sMeta *ServiceK8sMeta
collector pipeline.Collector
entityBuffer chan models.PipelineEvent
entityLinkBuffer chan models.PipelineEvent
stopCh chan struct{}
entityProcessor map[string]ProcessFunc
}
func (m *metaCollector) Start() error {
m.entityProcessor = map[string]ProcessFunc{
k8smeta.POD: m.processPodEntity,
k8smeta.NODE: m.processNodeEntity,
k8smeta.SERVICE: m.processServiceEntity,
k8smeta.DEPLOYMENT: m.processDeploymentEntity,
k8smeta.REPLICASET: m.processReplicaSetEntity,
k8smeta.DAEMONSET: m.processDaemonSetEntity,
k8smeta.STATEFULSET: m.processStatefulSetEntity,
k8smeta.CONFIGMAP: m.processConfigMapEntity,
k8smeta.JOB: m.processJobEntity,
k8smeta.CRONJOB: m.processCronJobEntity,
k8smeta.NAMESPACE: m.processNamespaceEntity,
k8smeta.PERSISTENTVOLUME: m.processPersistentVolumeEntity,
k8smeta.PERSISTENTVOLUMECLAIM: m.processPersistentVolumeClaimEntity,
k8smeta.STORAGECLASS: m.processStorageClassEntity,
k8smeta.INGRESS: m.processIngressEntity,
k8smeta.POD_NODE: m.processPodNodeLink,
k8smeta.POD_DEPLOYMENT: m.processPodDeploymentLink,
k8smeta.POD_REPLICASET: m.processPodReplicaSetLink,
k8smeta.REPLICASET_DEPLOYMENT: m.processReplicaSetDeploymentLink,
k8smeta.POD_STATEFULSET: m.processPodStatefulSetLink,
k8smeta.POD_DAEMONSET: m.processPodDaemonSetLink,
k8smeta.JOB_CRONJOB: m.processJobCronJobLink,
k8smeta.POD_JOB: m.processPodJobLink,
k8smeta.POD_PERSISENTVOLUMECLAIN: m.processPodPVCLink,
k8smeta.POD_CONFIGMAP: m.processPodConfigMapLink,
k8smeta.POD_SERVICE: m.processPodServiceLink,
k8smeta.POD_CONTAINER: m.processPodContainerLink,
k8smeta.INGRESS_SERVICE: m.processIngressServiceLink,
// add namesapce to xx link processor
k8smeta.POD_NAMESPACE: m.processPodNamespaceLink,
k8smeta.SERVICE_NAMESPACE: m.processServiceNamespaceLink,
k8smeta.DEPLOYMENT_NAMESPACE: m.processDeploymentNamespaceLink,
k8smeta.DAEMONSET_NAMESPACE: m.processDaemonSetNamespaceLink,
k8smeta.STATEFULSET_NAMESPACE: m.processStatefulNamespaceSetLink,
k8smeta.CONFIGMAP_NAMESPACE: m.processConfigMapNamespaceLink,
k8smeta.JOB_NAMESPACE: m.processJobNamespaceLink,
k8smeta.CRONJOB_NAMESPACE: m.processCronJobNamespaceLink,
k8smeta.PERSISTENTVOLUMECLAIM_NAMESPACE: m.processPVCNamespaceLink,
k8smeta.INGRESS_NAMESPACE: m.processIngressNamespaceLink,
}
if m.serviceK8sMeta.Pod {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Node {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.NODE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Service {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.SERVICE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Deployment {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.DEPLOYMENT, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.ReplicaSet {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.REPLICASET, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.DaemonSet {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.DAEMONSET, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.StatefulSet {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.STATEFULSET, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Configmap {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.CONFIGMAP, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Job {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.JOB, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.CronJob {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.CRONJOB, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.PersistentVolume {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.PERSISTENTVOLUME, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.PersistentVolumeClaim {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.PERSISTENTVOLUMECLAIM, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.StorageClass {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.STORAGECLASS, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Ingress {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.INGRESS, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Pod && m.serviceK8sMeta.Node && m.serviceK8sMeta.Node2Pod != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_NODE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Deployment && m.serviceK8sMeta.Pod && m.serviceK8sMeta.Deployment2Pod != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_DEPLOYMENT, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.ReplicaSet && m.serviceK8sMeta.Pod && m.serviceK8sMeta.ReplicaSet2Pod != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_REPLICASET, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Deployment && m.serviceK8sMeta.ReplicaSet && m.serviceK8sMeta.Deployment2ReplicaSet != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.REPLICASET_DEPLOYMENT, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.StatefulSet && m.serviceK8sMeta.Pod && m.serviceK8sMeta.StatefulSet2Pod != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_STATEFULSET, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.DaemonSet && m.serviceK8sMeta.Pod && m.serviceK8sMeta.DaemonSet2Pod != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_DAEMONSET, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.CronJob && m.serviceK8sMeta.Job && m.serviceK8sMeta.CronJob2Job != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.JOB_CRONJOB, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Job && m.serviceK8sMeta.Pod && m.serviceK8sMeta.Job2Pod != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_JOB, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Pod && m.serviceK8sMeta.PersistentVolumeClaim && m.serviceK8sMeta.Pod2PersistentVolumeClaim != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_PERSISENTVOLUMECLAIN, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Pod && m.serviceK8sMeta.Configmap && m.serviceK8sMeta.Pod2ConfigMap != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_CONFIGMAP, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Service && m.serviceK8sMeta.Pod && m.serviceK8sMeta.Service2Pod != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_SERVICE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Pod && m.serviceK8sMeta.Container && m.serviceK8sMeta.Pod2Container != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_CONTAINER, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Ingress && m.serviceK8sMeta.Service && m.serviceK8sMeta.Ingress2Service != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.INGRESS_SERVICE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.Pod && m.serviceK8sMeta.Namespace2Pod != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.POD_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.Service && m.serviceK8sMeta.Namespace2Service != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.SERVICE_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.Deployment && m.serviceK8sMeta.Namespace2Deployment != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.DEPLOYMENT_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.DaemonSet && m.serviceK8sMeta.Namespace2DaemonSet != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.DAEMONSET_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.StatefulSet && m.serviceK8sMeta.Namespace2StatefulSet != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.STATEFULSET_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.Configmap && m.serviceK8sMeta.Namespace2Configmap != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.CONFIGMAP_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.Job && m.serviceK8sMeta.Namespace2Job != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.JOB_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.CronJob && m.serviceK8sMeta.Namespace2CronJob != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.CRONJOB_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.PersistentVolumeClaim && m.serviceK8sMeta.Namespace2PersistentVolumeClaim != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.PERSISTENTVOLUMECLAIM_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
if m.serviceK8sMeta.Namespace && m.serviceK8sMeta.Ingress && m.serviceK8sMeta.Namespace2Ingress != "" {
m.serviceK8sMeta.metaManager.RegisterSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName, k8smeta.INGRESS_NAMESPACE, m.handleEvent, m.serviceK8sMeta.Interval)
}
go m.sendInBackground()
return nil
}
func (m *metaCollector) Stop() error {
m.serviceK8sMeta.metaManager.UnRegisterAllSendFunc(m.serviceK8sMeta.context.GetProject(), m.serviceK8sMeta.configName)
close(m.stopCh)
return nil
}
func canClusterLinkDirectly(resourceType string, serviceK8sMeta *ServiceK8sMeta) bool {
if strings.ToLower(resourceType) == "namespace" && serviceK8sMeta.Namespace && serviceK8sMeta.Cluster2Namespace != "" {
return true
}
if strings.ToLower(resourceType) == "node" && serviceK8sMeta.Node && serviceK8sMeta.Cluster2Node != "" {
return true
}
if strings.ToLower(resourceType) == "persistentvolume" && serviceK8sMeta.PersistentVolume && serviceK8sMeta.Cluster2PersistentVolume != "" {
return true
}
if strings.ToLower(resourceType) == "storageclass" && serviceK8sMeta.StorageClass && serviceK8sMeta.Cluster2StorageClass != "" {
return true
}
return false
}
func (m *metaCollector) handleEvent(event []*k8smeta.K8sMetaEvent) {
if len(event) == 0 {
return
}
switch event[0].EventType {
case k8smeta.EventTypeAdd, k8smeta.EventTypeUpdate:
for _, e := range event {
m.handleAddOrUpdate(e)
}
case k8smeta.EventTypeDelete:
for _, e := range event {
m.handleDelete(e)
}
default:
logger.Error(context.Background(), "UNKNOWN_EVENT_TYPE", "unknown event type", event[0].EventType)
}
}
func (m *metaCollector) handleAddOrUpdate(event *k8smeta.K8sMetaEvent) {
if processor, ok := m.entityProcessor[event.Object.ResourceType]; ok {
logs := processor(event.Object, "Update")
for _, log := range logs {
m.send(log, isEntity(event.Object.ResourceType))
if isEntity(event.Object.ResourceType) && canClusterLinkDirectly(event.Object.ResourceType, m.serviceK8sMeta) {
link := m.generateEntityClusterLink(log)
m.send(link, true)
}
}
}
}
func (m *metaCollector) handleDelete(event *k8smeta.K8sMetaEvent) {
if processor, ok := m.entityProcessor[event.Object.ResourceType]; ok {
logs := processor(event.Object, "Expire")
for _, log := range logs {
m.send(log, isEntity(event.Object.ResourceType))
if isEntity(event.Object.ResourceType) && canClusterLinkDirectly(event.Object.ResourceType, m.serviceK8sMeta) {
link := m.generateEntityClusterLink(log)
m.send(link, true)
}
}
}
}
func (m *metaCollector) processEntityCommonPart(logContents models.LogContents, kind, namespace, name, method string, firstObservedTime, lastObservedTime int64, creationTime v1.Time) {
// entity reserved fields
logContents.Add(entityDomainFieldName, m.serviceK8sMeta.domain)
logContents.Add(entityTypeFieldName, m.genEntityTypeKey(kind))
logContents.Add(entityIDFieldName, m.genKey(kind, namespace, name))
logContents.Add(entityMethodFieldName, method)
logContents.Add(entityFirstObservedTimeFieldName, strconv.FormatInt(firstObservedTime, 10))
logContents.Add(entityLastObservedTimeFieldName, strconv.FormatInt(lastObservedTime, 10))
logContents.Add(entityKeepAliveSecondsFieldName, strconv.FormatInt(int64(m.serviceK8sMeta.Interval*2), 10))
logContents.Add(entityCategoryFieldName, defaultEntityCategory)
// common custom fields
logContents.Add(entityClusterIDFieldName, m.serviceK8sMeta.clusterID)
logContents.Add(entityKindFieldName, kind)
logContents.Add(entityNameFieldName, name)
logContents.Add(entityCreationTimeFieldName, creationTime.Format(time.RFC3339))
}
func (m *metaCollector) processEntityLinkCommonPart(logContents models.LogContents, srcKind, srcNamespace, srcName, destKind, destNamespace, destName, method string, firstObservedTime, lastObservedTime int64) {
logContents.Add(entityLinkSrcDomainFieldName, m.serviceK8sMeta.domain)
logContents.Add(entityLinkSrcEntityTypeFieldName, m.genEntityTypeKey(srcKind))
logContents.Add(entityLinkSrcEntityIDFieldName, m.genKey(srcKind, srcNamespace, srcName))
logContents.Add(entityLinkDestDomainFieldName, m.serviceK8sMeta.domain)
logContents.Add(entityLinkDestEntityTypeFieldName, m.genEntityTypeKey(destKind))
logContents.Add(entityLinkDestEntityIDFieldName, m.genKey(destKind, destNamespace, destName))
logContents.Add(entityMethodFieldName, method)
logContents.Add(entityFirstObservedTimeFieldName, strconv.FormatInt(firstObservedTime, 10))
logContents.Add(entityLastObservedTimeFieldName, strconv.FormatInt(lastObservedTime, 10))
logContents.Add(entityKeepAliveSecondsFieldName, strconv.FormatInt(int64(m.serviceK8sMeta.Interval*2), 10))
logContents.Add(entityCategoryFieldName, defaultEntityLinkCategory)
}
func (m *metaCollector) processEntityJSONObject(obj interface{}) string {
if obj == nil {
return "{}"
}
objStr, err := json.Marshal(obj)
if err != nil {
logger.Error(context.Background(), "PROCESS_ENTITY_JSON_OBJECT_FAIL", "process entity json object fail", err)
return "{}"
}
return string(objStr)
}
func (m *metaCollector) processEntityJSONArray(obj []map[string]string) string {
if obj == nil {
return "[]"
}
objStr, err := json.Marshal(obj)
if err != nil {
logger.Error(context.Background(), "PROCESS_ENTITY_JSON_ARRAY_FAIL", "process entity json array fail", err)
return "[]"
}
return string(objStr)
}
func (m *metaCollector) send(event models.PipelineEvent, entity bool) {
var buffer chan models.PipelineEvent
if entity {
buffer = m.entityBuffer
} else {
buffer = m.entityLinkBuffer
}
select {
case buffer <- event:
case <-time.After(3 * time.Second):
logger.Error(context.Background(), "SEND_EVENT_TIMEOUT", "send event timeout, isEntity", entity)
}
}
func (m *metaCollector) sendInBackground() {
entityGroup := &models.PipelineGroupEvents{}
linkGroup := &models.PipelineGroupEvents{}
sendFunc := func(group *models.PipelineGroupEvents) {
for _, e := range group.Events {
// TODO: temporary convert from event group back to log, will fix after pipeline support Go input to C++ processor
log := convertPipelineEvent2Log(e)
m.collector.AddRawLog(log)
}
group.Events = group.Events[:0]
}
lastSendClusterTime := time.Now()
for {
select {
case e := <-m.entityBuffer:
entityGroup.Events = append(entityGroup.Events, e)
if len(entityGroup.Events) >= 100 {
m.serviceK8sMeta.entityCount.Add(int64(len(entityGroup.Events)))
sendFunc(entityGroup)
}
case e := <-m.entityLinkBuffer:
linkGroup.Events = append(linkGroup.Events, e)
if len(linkGroup.Events) >= 100 {
m.serviceK8sMeta.linkCount.Add(int64(len(linkGroup.Events)))
sendFunc(linkGroup)
}
case <-time.After(3 * time.Second):
if len(entityGroup.Events) > 0 {
m.serviceK8sMeta.entityCount.Add(int64(len(entityGroup.Events)))
sendFunc(entityGroup)
}
if len(linkGroup.Events) > 0 {
m.serviceK8sMeta.linkCount.Add(int64(len(linkGroup.Events)))
sendFunc(linkGroup)
}
case <-m.stopCh:
return
}
if time.Since(lastSendClusterTime) > time.Duration(m.serviceK8sMeta.Interval)*time.Second {
// send cluster entity if in infra domain
if m.serviceK8sMeta.domain == infraDomain {
clusterEntity := m.generateClusterEntity()
m.collector.AddRawLog(convertPipelineEvent2Log(clusterEntity))
lastSendClusterTime = time.Now()
}
}
}
}
func (m *metaCollector) genKey(kind, namespace, name string) string {
key := m.serviceK8sMeta.clusterID + kind + namespace + name
// #nosec G401
return fmt.Sprintf("%x", md5.Sum([]byte(key)))
}
func (m *metaCollector) generateClusterEntity() models.PipelineEvent {
log := &models.Log{}
log.Contents = models.NewLogContents()
log.Timestamp = uint64(time.Now().Unix())
log.Contents.Add(entityDomainFieldName, m.serviceK8sMeta.domain)
log.Contents.Add(entityTypeFieldName, m.genEntityTypeKey(clusterTypeName))
log.Contents.Add(entityIDFieldName, m.genKey("", "", ""))
log.Contents.Add(entityMethodFieldName, "Update")
log.Contents.Add(entityFirstObservedTimeFieldName, strconv.FormatInt(time.Now().Unix(), 10))
log.Contents.Add(entityLastObservedTimeFieldName, strconv.FormatInt(time.Now().Unix(), 10))
log.Contents.Add(entityKeepAliveSecondsFieldName, strconv.FormatInt(int64(m.serviceK8sMeta.Interval*2), 10))
log.Contents.Add(entityCategoryFieldName, defaultEntityCategory)
log.Contents.Add(entityClusterIDFieldName, m.serviceK8sMeta.clusterID)
return log
}
func (m *metaCollector) generateEntityClusterLink(entityEvent models.PipelineEvent) models.PipelineEvent {
content := entityEvent.(*models.Log).Contents
log := &models.Log{}
log.Contents = models.NewLogContents()
log.Contents.Add(entityLinkSrcDomainFieldName, m.serviceK8sMeta.domain)
log.Contents.Add(entityLinkSrcEntityTypeFieldName, m.genEntityTypeKey(clusterTypeName))
log.Contents.Add(entityLinkSrcEntityIDFieldName, m.genKey("", "", ""))
log.Contents.Add(entityLinkDestDomainFieldName, m.serviceK8sMeta.domain)
log.Contents.Add(entityLinkDestEntityTypeFieldName, content.Get(entityTypeFieldName))
log.Contents.Add(entityLinkDestEntityIDFieldName, content.Get(entityIDFieldName))
log.Contents.Add(entityLinkRelationTypeFieldName, "runs")
log.Contents.Add(entityMethodFieldName, content.Get(entityMethodFieldName))
log.Contents.Add(entityFirstObservedTimeFieldName, content.Get(entityFirstObservedTimeFieldName))
log.Contents.Add(entityLastObservedTimeFieldName, content.Get(entityLastObservedTimeFieldName))
log.Contents.Add(entityKeepAliveSecondsFieldName, m.serviceK8sMeta.Interval*2)
log.Contents.Add(entityCategoryFieldName, defaultEntityLinkCategory)
log.Timestamp = uint64(time.Now().Unix())
return log
}
func (m *metaCollector) genEntityTypeKey(kind string) string {
// assert domain is initialized
if kind == "" {
return m.serviceK8sMeta.domain + ".k8s"
}
if kind == clusterTypeName && m.serviceK8sMeta.domain == acsDomain {
return m.serviceK8sMeta.domain + "." + *flags.ClusterType + "." + clusterTypeName
}
return m.serviceK8sMeta.domain + ".k8s." + strings.ToLower(kind)
}
func convertPipelineEvent2Log(event models.PipelineEvent) *protocol.Log {
if modelLog, ok := event.(*models.Log); ok {
log := &protocol.Log{}
log.Contents = make([]*protocol.Log_Content, 0)
for k, v := range modelLog.Contents.Iterator() {
if _, ok := v.(string); !ok {
if intValue, ok := v.(int); !ok {
logger.Error(context.Background(), "COVERT_EVENT_TO_LOG_FAIL", "convert event to log fail, value is not string", v, "key", k)
continue
} else {
v = strconv.Itoa(intValue)
}
}
log.Contents = append(log.Contents, &protocol.Log_Content{Key: k, Value: v.(string)})
}
protocol.SetLogTime(log, uint32(modelLog.GetTimestamp()))
return log
}
return nil
}
func isEntity(resourceType string) bool {
return !strings.Contains(resourceType, k8smeta.LINK_SPLIT_CHARACTER)
}