pkg/helper/docker_center.go (1,053 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package helper import ( "context" "hash/fnv" "path/filepath" "regexp" "runtime" "sort" "strings" "sync" "sync/atomic" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/events" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/util" ) var dockerCenterInstance *DockerCenter var containerFindingManager *ContainerDiscoverManager var onceDocker sync.Once // set default value to aliyun_logs_ var envConfigPrefix = "aliyun_logs_" const DockerTimeFormat = "2006-01-02T15:04:05.999999999Z" var DefaultSyncContainersPeriod = time.Second * 3 // should be same as docker_config_update_interval gflag in C var ContainerInfoDeletedTimeout = time.Second * time.Duration(120) var EventListenerTimeout = time.Second * time.Duration(3600) // "io.kubernetes.pod.name": "logtail-z2224", // "io.kubernetes.pod.namespace": "kube-system", // "io.kubernetes.pod.uid": "222e88ff-8f08-11e8-851d-00163f008685", const k8sPodNameLabel = "io.kubernetes.pod.name" const k8sPodNameSpaceLabel = "io.kubernetes.pod.namespace" const k8sPodUUIDLabel = "io.kubernetes.pod.uid" const k8sInnerLabelPrefix = "io.kubernetes" const k8sInnerAnnotationPrefix = "annotation." const ( ContainerStatusRunning = "running" ContainerStatusExited = "exited" ) type EnvConfigInfo struct { ConfigName string ConfigItemMap map[string]string } // K8SFilter used for find specific container type K8SFilter struct { NamespaceReg *regexp.Regexp PodReg *regexp.Regexp ContainerReg *regexp.Regexp IncludeLabels map[string]string ExcludeLabels map[string]string IncludeLabelRegs map[string]*regexp.Regexp ExcludeLabelRegs map[string]*regexp.Regexp hashKey uint64 } // CreateK8SFilter ... func CreateK8SFilter(ns, pod, container string, includeK8sLabels, excludeK8sLabels map[string]string) (*K8SFilter, error) { var filter K8SFilter var err error var hashStrBuilder strings.Builder if len(ns) > 0 { if filter.NamespaceReg, err = regexp.Compile(ns); err != nil { return nil, err } } hashStrBuilder.WriteString(ns) hashStrBuilder.WriteString("$$$") if len(pod) > 0 { if filter.PodReg, err = regexp.Compile(pod); err != nil { return nil, err } } hashStrBuilder.WriteString(pod) hashStrBuilder.WriteString("$$$") if len(container) > 0 { if filter.ContainerReg, err = regexp.Compile(container); err != nil { return nil, err } } hashStrBuilder.WriteString(container) hashStrBuilder.WriteString("$$$") if filter.IncludeLabels, filter.IncludeLabelRegs, err = SplitRegexFromMap(includeK8sLabels); err != nil { return nil, err } for includeKey, val := range includeK8sLabels { hashStrBuilder.WriteString(includeKey) hashStrBuilder.WriteByte('#') hashStrBuilder.WriteString(val) } if filter.ExcludeLabels, filter.ExcludeLabelRegs, err = SplitRegexFromMap(excludeK8sLabels); err != nil { return nil, err } hashStrBuilder.WriteString("$$$") for excludeKey, val := range excludeK8sLabels { hashStrBuilder.WriteString(excludeKey) hashStrBuilder.WriteByte('#') hashStrBuilder.WriteString(val) } h := fnv.New64a() _, _ = h.Write([]byte(hashStrBuilder.String())) filter.hashKey = h.Sum64() return &filter, nil } // "io.kubernetes.container.logpath": "/var/log/pods/222e88ff-8f08-11e8-851d-00163f008685/logtail_0.log", // "io.kubernetes.container.name": "logtail", // "io.kubernetes.docker.type": "container", // "io.kubernetes.pod.name": "logtail-z2224", // "io.kubernetes.pod.namespace": "kube-system", // "io.kubernetes.pod.uid": "222e88ff-8f08-11e8-851d-00163f008685", type K8SInfo struct { Namespace string Pod string ContainerName string Labels map[string]string PausedContainer bool matchedCache map[uint64]bool mu sync.RWMutex } func (info *K8SInfo) IsSamePod(o *K8SInfo) bool { return info.Namespace == o.Namespace && info.Pod == o.Pod } func (info *K8SInfo) GetLabel(key string) string { info.mu.RLock() defer info.mu.RUnlock() if info.Labels != nil { return info.Labels[key] } return "" } // ExtractK8sLabels only work for original docker container. func (info *K8SInfo) ExtractK8sLabels(containerInfo types.ContainerJSON) { // only pause container has k8s labels if info.ContainerName == "POD" || info.ContainerName == "pause" { info.mu.Lock() defer info.mu.Unlock() info.PausedContainer = true if info.Labels == nil { info.Labels = make(map[string]string) } for key, val := range containerInfo.Config.Labels { if strings.HasPrefix(key, k8sInnerLabelPrefix) || strings.HasPrefix(key, k8sInnerAnnotationPrefix) { continue } info.Labels[key] = val } } } func (info *K8SInfo) Merge(o *K8SInfo) { info.mu.Lock() o.mu.Lock() defer info.mu.Unlock() defer o.mu.Unlock() // only pause container has k8s labels, so we can only check len(labels) if len(o.Labels) > len(info.Labels) { info.Labels = o.Labels info.matchedCache = nil } if len(o.Labels) < len(info.Labels) { o.Labels = info.Labels o.matchedCache = nil } } // IsMatch ... func (info *K8SInfo) IsMatch(filter *K8SFilter) bool { if filter == nil { return true } info.mu.RLock() // 使用读锁 isPausedContainer := info.PausedContainer info.mu.RUnlock() // 解读锁 if isPausedContainer { return false } info.mu.Lock() // 使用写锁 defer info.mu.Unlock() // 解写锁 if info.matchedCache == nil { info.matchedCache = make(map[uint64]bool) } else if cacheRst, ok := info.matchedCache[filter.hashKey]; ok { return cacheRst } var rst = info.innerMatch(filter) info.matchedCache[filter.hashKey] = rst return rst } // innerMatch ... func (info *K8SInfo) innerMatch(filter *K8SFilter) bool { if filter.NamespaceReg != nil && !filter.NamespaceReg.MatchString(info.Namespace) { return false } if filter.PodReg != nil && !filter.PodReg.MatchString(info.Pod) { return false } if filter.ContainerReg != nil && !filter.ContainerReg.MatchString(info.ContainerName) { return false } // if labels is nil, create an empty map if info.Labels == nil { info.Labels = make(map[string]string) } return isMapLabelsMatch(filter.IncludeLabels, filter.ExcludeLabels, filter.IncludeLabelRegs, filter.ExcludeLabelRegs, info.Labels) } type DockerInfoDetail struct { StdoutPath string ContainerInfo types.ContainerJSON ContainerNameTag map[string]string K8SInfo *K8SInfo EnvConfigInfoMap map[string]*EnvConfigInfo ContainerIP string DefaultRootPath string lastUpdateTime time.Time deleteFlag bool } func (did *DockerInfoDetail) IDPrefix() string { return GetShortID(did.ContainerInfo.ID) } func (did *DockerInfoDetail) PodName() string { if did.K8SInfo != nil { return did.K8SInfo.Pod } return "" } func (did *DockerInfoDetail) FinishedAt() string { if did.ContainerInfo.State != nil { return did.ContainerInfo.State.FinishedAt } return "" } func (did *DockerInfoDetail) Status() string { if did.ContainerInfo.State != nil { return did.ContainerInfo.State.Status } return "" } func (did *DockerInfoDetail) IsTimeout() bool { nowTime := time.Now() if nowTime.Sub(did.lastUpdateTime) > fetchAllSuccessTimeout || (did.deleteFlag && nowTime.Sub(did.lastUpdateTime) > ContainerInfoDeletedTimeout) { return true } return false } func (did *DockerInfoDetail) GetExternalTags(envs, k8sLabels map[string]string) map[string]string { tags := map[string]string{} if len(envs) == 0 && len(k8sLabels) == 0 { return tags } did.GetCustomExternalTags(tags, envs, k8sLabels) return tags } func (did *DockerInfoDetail) GetCustomExternalTags(tags, envs, k8sLabels map[string]string) { if len(envs) == 0 && len(k8sLabels) == 0 { return } for k, realName := range envs { tags[realName] = did.GetEnv(k) } if did.K8SInfo != nil { for k, realName := range k8sLabels { tags[realName] = did.K8SInfo.GetLabel(k) } } } func (did *DockerInfoDetail) GetEnv(key string) string { for _, env := range did.ContainerInfo.Config.Env { kvPair := strings.SplitN(env, "=", 2) if len(kvPair) != 2 { continue } if key == kvPair[0] { return kvPair[1] } } return "" } func (did *DockerInfoDetail) DiffName(other *DockerInfoDetail) bool { return did.ContainerInfo.ID != other.ContainerInfo.ID || did.ContainerInfo.Name != other.ContainerInfo.Name } func (did *DockerInfoDetail) DiffMount(other *DockerInfoDetail) bool { return len(did.ContainerInfo.Config.Volumes) != len(other.ContainerInfo.Config.Volumes) } func isPathSeparator(c byte) bool { return c == '/' || c == '\\' } func (did *DockerInfoDetail) FindBestMatchedPath(pth string) (sourcePath, containerPath string) { pth = filepath.Clean(pth) pthSize := len(pth) // logger.Debugf(context.Background(), "FindBestMatchedPath for container %s, target path: %s, containerInfo: %+v", did.IDPrefix(), pth, did.ContainerInfo) // check mounts var bestMatchedMounts types.MountPoint for _, mount := range did.ContainerInfo.Mounts { // logger.Debugf("container(%s-%s) mount: source-%s destination-%s", did.IDPrefix(), did.ContainerInfo.Name, mount.Source, mount.Destination) dst := filepath.Clean(mount.Destination) dstSize := len(dst) if strings.HasPrefix(pth, dst) && (pthSize == dstSize || (pthSize > dstSize && isPathSeparator(pth[dstSize]))) && len(bestMatchedMounts.Destination) < dstSize { bestMatchedMounts = mount } } if len(bestMatchedMounts.Source) > 0 { return bestMatchedMounts.Source, bestMatchedMounts.Destination } return did.DefaultRootPath, "" } func (did *DockerInfoDetail) MakeSureEnvConfigExist(configName string) *EnvConfigInfo { if did.EnvConfigInfoMap == nil { did.EnvConfigInfoMap = make(map[string]*EnvConfigInfo) } config, ok := did.EnvConfigInfoMap[configName] if !ok { envConfig := &EnvConfigInfo{ ConfigName: configName, ConfigItemMap: make(map[string]string), } did.EnvConfigInfoMap[configName] = envConfig return envConfig } return config } // FindAllEnvConfig find and pre process all env config, add tags for docker info func (did *DockerInfoDetail) FindAllEnvConfig(envConfigPrefix string, selfConfigFlag bool) { if len(envConfigPrefix) == 0 { return } selfEnvConfig := false for _, env := range did.ContainerInfo.Config.Env { kvPair := strings.SplitN(env, "=", 2) if len(kvPair) != 2 { continue } key := kvPair[0] value := kvPair[1] if key == "ALICLOUD_LOG_DOCKER_ENV_CONFIG_SELF" && (value == "true" || value == "TRUE") { logger.Debug(context.Background(), "this container is self env config", did.ContainerInfo.Name) selfEnvConfig = true continue } if !strings.HasPrefix(key, envConfigPrefix) { continue } logger.Debug(context.Background(), "docker env config, name", did.ContainerInfo.Name, "item", key) envKey := key[len(envConfigPrefix):] lastIndex := strings.LastIndexByte(envKey, '_') var configName string // end with '_', invalid, just skip if lastIndex == len(envKey)-1 { continue } // raw config if lastIndex < 0 { configName = envKey } else { configName = envKey[0:lastIndex] } // invalid config name if len(configName) == 0 { continue } envConfig := did.MakeSureEnvConfigExist(configName) if lastIndex < 0 { envConfig.ConfigItemMap[""] = value } else { tailKey := envKey[lastIndex+1:] envConfig.ConfigItemMap[tailKey] = value // process tags if tailKey == "tags" { tagKV := strings.SplitN(value, "=", 2) // if tag exist in EnvTags, just skip this tag if len(tagKV) == 2 { if !HasEnvTags(tagKV[0], tagKV[1]) { did.ContainerNameTag[tagKV[0]] = tagKV[1] } else { logger.Info(context.Background(), "skip set this tag, as this exist in self env tags, key", tagKV[0], "value", tagKV[1]) } } else { if !HasEnvTags(tagKV[0], tagKV[0]) { did.ContainerNameTag[tagKV[0]] = tagKV[0] } else { logger.Info(context.Background(), "skip set this tag, as this exist in self env tags, key&value", tagKV[0]) } } } } } logger.Debug(context.Background(), "docker env", did.ContainerInfo.Config.Env, "prefix", envConfigPrefix, "env config", did.EnvConfigInfoMap, "self env config", selfEnvConfig) // ignore self env config if !selfConfigFlag && selfEnvConfig { did.EnvConfigInfoMap = make(map[string]*EnvConfigInfo) } } type DockerCenter struct { // ContainerMap contains all container information. // For the docker scenario, the container list is the same as the result executed with `docker ps` commands. So the container // list would also contain the sandbox containers when docker is used as an engine in Kubernetes. // For the CRI scenario, the container list only contains the real containers and excludes the sandbox containers. But the // sandbox meta would be saved to its bound container. containerMap map[string]*DockerInfoDetail // all containers will in this map client DockerCenterClientInterface containerHelper ContainerHelperInterface lastErrMu sync.Mutex lastErr error lock sync.RWMutex lastUpdateMapTime int64 eventChan chan events.Message eventChanLock sync.Mutex containerStateLock sync.Mutex imageLock sync.RWMutex imageCache map[string]string initStaticContainerInfoSuccess bool } type DockerCenterClientInterface interface { ContainerList(ctx context.Context, options types.ContainerListOptions) ([]types.Container, error) ImageInspectWithRaw(ctx context.Context, imageID string) (types.ImageInspect, []byte, error) ContainerInspect(ctx context.Context, containerID string) (types.ContainerJSON, error) Events(ctx context.Context, options types.EventsOptions) (<-chan events.Message, <-chan error) } type ContainerHelperInterface interface { ContainerProcessAlive(pid int) bool } type ContainerHelperWrapper struct { } func (r *ContainerHelperWrapper) ContainerProcessAlive(pid int) bool { return ContainerProcessAlive(pid) } func getIPByHosts(hostFileName, hostname string) string { lines, err := util.ReadLines(hostFileName) if err != nil { logger.Info(context.Background(), "read container hosts file error, file", hostFileName, "error", err.Error()) return "" } for _, line := range lines { if strings.HasPrefix(line, "#") { continue } if strings.Index(line, hostname) > 0 { line = strings.Trim(line, "#/* \t\n") return util.ReadFirstBlock(line) } } if util.GetHostName() == hostname { return util.GetIPAddress() } return "" } func (dc *DockerCenter) registerEventListener(c chan events.Message) { dc.eventChanLock.Lock() defer dc.eventChanLock.Unlock() dc.eventChan = c } func (dc *DockerCenter) unRegisterEventListener(_ chan events.Message) { dc.eventChanLock.Lock() defer dc.eventChanLock.Unlock() dc.eventChan = nil } func (dc *DockerCenter) lookupImageCache(id string) (string, bool) { dc.imageLock.RLock() defer dc.imageLock.RUnlock() imageName, ok := dc.imageCache[id] return imageName, ok } func (dc *DockerCenter) getImageName(id, defaultVal string) string { if len(id) == 0 || dc.client == nil { return defaultVal } if imageName, ok := dc.lookupImageCache(id); ok { return imageName } image, _, err := dc.client.ImageInspectWithRaw(context.Background(), id) logger.Debug(context.Background(), "get image name, id", id, "error", err) if err == nil && len(image.RepoTags) > 0 { dc.imageLock.Lock() dc.imageCache[id] = image.RepoTags[0] dc.imageLock.Unlock() return image.RepoTags[0] } return defaultVal } func (dc *DockerCenter) getIPAddress(info types.ContainerJSON) string { if detail, ok := dc.getContainerDetail(info.ID); ok && detail != nil { return detail.ContainerIP } if info.NetworkSettings != nil && len(info.NetworkSettings.IPAddress) > 0 { return info.NetworkSettings.IPAddress } if len(info.Config.Hostname) > 0 && len(info.HostsPath) > 0 { return getIPByHosts(GetMountedFilePath(info.HostsPath), info.Config.Hostname) } return "" } // CreateInfoDetail create DockerInfoDetail with docker.Container // Container property used in this function : HostsPath, Config.Hostname, Name, Config.Image, Config.Env, Mounts // ContainerInfo.GraphDriver.Data["UpperDir"] Config.Labels func (dc *DockerCenter) CreateInfoDetail(info types.ContainerJSON, envConfigPrefix string, selfConfigFlag bool) *DockerInfoDetail { // Generate Log Tags containerNameTag := make(map[string]string) k8sInfo := K8SInfo{} ip := dc.getIPAddress(info) containerNameTag["_image_name_"] = dc.getImageName(info.Image, info.Config.Image) if strings.HasPrefix(info.Name, "/k8s_") || strings.HasPrefix(info.Name, "k8s_") || strings.Count(info.Name, "_") >= 4 { // 1. container name is k8s // k8s_php-redis_frontend-2337258262-154p7_default_d8a2e2dd-3617-11e7-a4b0-ecf4bbe5d414_0 // terway_terway-multi-ip-mgslw_kube-system_b07b491e-995a-11e9-94ea-00163e080931_8 tags := strings.SplitN(info.Name, "_", 6) // containerNamePrefix:k8s // containerName:php-redis // podFullName:frontend-2337258262-154p7 // computeHash:154p7 // deploymentName:frontend // replicaSetName:frontend-2337258262 // namespace:default // podUID:d8a2e2dd-3617-11e7-a4b0-ecf4bbe5d414 baseIndex := 0 if len(tags) == 6 { baseIndex = 1 } containerNameTag["_container_name_"] = tags[baseIndex] containerNameTag["_pod_name_"] = tags[baseIndex+1] containerNameTag["_namespace_"] = tags[baseIndex+2] containerNameTag["_pod_uid_"] = tags[baseIndex+3] k8sInfo.ContainerName = tags[baseIndex] k8sInfo.Pod = tags[baseIndex+1] k8sInfo.Namespace = tags[baseIndex+2] k8sInfo.ExtractK8sLabels(info) } else if _, ok := info.Config.Labels[k8sPodNameLabel]; ok { // 2. container labels has k8sPodNameLabel containerNameTag["_container_name_"] = info.Name containerNameTag["_pod_name_"] = info.Config.Labels[k8sPodNameLabel] containerNameTag["_namespace_"] = info.Config.Labels[k8sPodNameSpaceLabel] containerNameTag["_pod_uid_"] = info.Config.Labels[k8sPodUUIDLabel] k8sInfo.ContainerName = info.Name k8sInfo.Pod = info.Config.Labels[k8sPodNameLabel] k8sInfo.Namespace = info.Config.Labels[k8sPodNameSpaceLabel] // the following method is couped with the CRI adapter, only the original docker container labels // would be added to the labels of the k8s info. k8sInfo.ExtractK8sLabels(info) } else { // 3. treat as normal container if strings.HasPrefix(info.Name, "/") { containerNameTag["_container_name_"] = info.Name[1:] } else { containerNameTag["_container_name_"] = info.Name } } if len(ip) > 0 { containerNameTag["_container_ip_"] = ip } for i := range info.Mounts { info.Mounts[i].Source = filepath.Clean(info.Mounts[i].Source) info.Mounts[i].Destination = filepath.Clean(info.Mounts[i].Destination) } sortMounts := func(mounts []types.MountPoint) { sort.Slice(mounts, func(i, j int) bool { return mounts[i].Source < mounts[j].Source }) } sortMounts(info.Mounts) did := &DockerInfoDetail{ StdoutPath: info.LogPath, ContainerInfo: info, ContainerNameTag: containerNameTag, K8SInfo: &k8sInfo, ContainerIP: ip, lastUpdateTime: time.Now(), } // Find Env Log Configs did.FindAllEnvConfig(envConfigPrefix, selfConfigFlag) // Find Container FS Root Path on Host // @note for overlayfs only, some driver like nas, you can not see it in upper dir if info.GraphDriver.Data != nil { if rootPath, ok := did.ContainerInfo.GraphDriver.Data["UpperDir"]; ok { did.DefaultRootPath = rootPath } } // for cri-runtime if criRuntimeWrapper != nil && info.HostConfig != nil && len(did.DefaultRootPath) == 0 { did.DefaultRootPath = criRuntimeWrapper.lookupContainerRootfsAbsDir(info) } logger.Debugf(context.Background(), "container(id: %s, name: %s) default root path is %s", info.ID, info.Name, did.DefaultRootPath) return did } func getDockerCenterInstance() *DockerCenter { onceDocker.Do(func() { logger.InitLogger() // load EnvTags first LoadEnvTags() dockerCenterInstance = &DockerCenter{ containerHelper: &ContainerHelperWrapper{}, } dockerCenterInstance.imageCache = make(map[string]string) dockerCenterInstance.containerMap = make(map[string]*DockerInfoDetail) // containerFindingManager works in a producer-consumer model // so even manager is not initialized, it will not affect consumers like service_stdout go func() { retryCount := 0 containerFindingManager = NewContainerDiscoverManager() for { if containerFindingManager.Init() { break } if retryCount%10 == 0 { logger.Error(context.Background(), "DOCKER_CENTER_ALARM", "docker center init failed", "retry count", retryCount) } retryCount++ time.Sleep(time.Second * 1) } containerFindingManager.TimerFetch() containerFindingManager.StartSyncContainers() }() }) return dockerCenterInstance } func SetEnvConfigPrefix(prefix string) { envConfigPrefix = prefix } func (dc *DockerCenter) readStaticConfig(forceFlush bool) { staticDockerContainerLock.Lock() defer staticDockerContainerLock.Unlock() containerInfo, removedIDs, changed, err := tryReadStaticContainerInfo() if err != nil { logger.Warning(context.Background(), "READ_STATIC_CONFIG_ALARM", "read static container info error", err) } if !dc.initStaticContainerInfoSuccess && len(containerInfo) > 0 { dc.initStaticContainerInfoSuccess = true forceFlush = true } // 静态文件读取容器信息的时候,只能全量读取,因此使用updateContainers全量更新 if forceFlush || changed { containerMap := make(map[string]*DockerInfoDetail) for _, info := range containerInfo { dockerInfoDetail := dockerCenterInstance.CreateInfoDetail(info, envConfigPrefix, false) containerMap[info.ID] = dockerInfoDetail } dockerCenterInstance.updateContainers(containerMap) } if len(removedIDs) > 0 { for _, id := range removedIDs { dockerCenterInstance.markRemove(id) } } } func (dc *DockerCenter) flushStaticConfig() { for { dc.readStaticConfig(false) time.Sleep(time.Second) } } func (dc *DockerCenter) setLastError(err error, msg string) { dc.lastErrMu.Lock() dc.lastErr = err dc.lastErrMu.Unlock() if err != nil { logger.Warning(context.Background(), "DOCKER_CENTER_ALARM", "message", msg, "error found", err) } else { logger.Debug(context.Background(), "message", msg) } } func isMapLabelsMatch(includeLabel map[string]string, excludeLabel map[string]string, includeLabelRegex map[string]*regexp.Regexp, excludeLabelRegex map[string]*regexp.Regexp, labels map[string]string) bool { if len(includeLabel) != 0 || len(includeLabelRegex) != 0 { matchedFlag := false for key, val := range includeLabel { if dockerVal, ok := labels[key]; ok && (len(val) == 0 || dockerVal == val) { matchedFlag = true break } } // if matched, do not need check regex if !matchedFlag { for key, reg := range includeLabelRegex { if dockerVal, ok := labels[key]; ok && reg.MatchString(dockerVal) { matchedFlag = true break } } } if !matchedFlag { return false } } for key, val := range excludeLabel { if dockerVal, ok := labels[key]; ok && (len(val) == 0 || dockerVal == val) { return false } } for key, reg := range excludeLabelRegex { if dockerVal, ok := labels[key]; ok && reg.MatchString(dockerVal) { return false } } return true } func isContainerLabelMatch(includeLabel map[string]string, excludeLabel map[string]string, includeLabelRegex map[string]*regexp.Regexp, excludeLabelRegex map[string]*regexp.Regexp, info *DockerInfoDetail) bool { return isMapLabelsMatch(includeLabel, excludeLabel, includeLabelRegex, excludeLabelRegex, info.ContainerInfo.Config.Labels) } func isMathEnvItem(env string, staticEnv map[string]string, regexEnv map[string]*regexp.Regexp) bool { var envKey, envValue string splitArray := strings.SplitN(env, "=", 2) if len(splitArray) < 2 { envKey = splitArray[0] } else { envKey = splitArray[0] envValue = splitArray[1] } if len(staticEnv) > 0 { if value, ok := staticEnv[envKey]; ok && (len(value) == 0 || value == envValue) { return true } } if len(regexEnv) > 0 { if reg, ok := regexEnv[envKey]; ok && reg.MatchString(envValue) { return true } } return false } func isContainerEnvMatch(includeEnv map[string]string, excludeEnv map[string]string, includeEnvRegex map[string]*regexp.Regexp, excludeEnvRegex map[string]*regexp.Regexp, info *DockerInfoDetail) bool { if len(includeEnv) != 0 || len(includeEnvRegex) != 0 { matchFlag := false for _, env := range info.ContainerInfo.Config.Env { if isMathEnvItem(env, includeEnv, includeEnvRegex) { matchFlag = true break } } if !matchFlag { return false } } if len(excludeEnv) != 0 || len(excludeEnvRegex) != 0 { for _, env := range info.ContainerInfo.Config.Env { if isMathEnvItem(env, excludeEnv, excludeEnvRegex) { return false } } } return true } func (dc *DockerCenter) getAllAcceptedInfo( includeLabel map[string]string, excludeLabel map[string]string, includeLabelRegex map[string]*regexp.Regexp, excludeLabelRegex map[string]*regexp.Regexp, includeEnv map[string]string, excludeEnv map[string]string, includeEnvRegex map[string]*regexp.Regexp, excludeEnvRegex map[string]*regexp.Regexp, k8sFilter *K8SFilter, ) map[string]*DockerInfoDetail { containerMap := make(map[string]*DockerInfoDetail) dc.lock.RLock() defer dc.lock.RUnlock() for id, info := range dc.containerMap { if isContainerLabelMatch(includeLabel, excludeLabel, includeLabelRegex, excludeLabelRegex, info) && isContainerEnvMatch(includeEnv, excludeEnv, includeEnvRegex, excludeEnvRegex, info) && info.K8SInfo.IsMatch(k8sFilter) { containerMap[id] = info } } return containerMap } func (dc *DockerCenter) getAllAcceptedInfoV2( fullList map[string]bool, matchList map[string]*DockerInfoDetail, includeLabel map[string]string, excludeLabel map[string]string, includeLabelRegex map[string]*regexp.Regexp, excludeLabelRegex map[string]*regexp.Regexp, includeEnv map[string]string, excludeEnv map[string]string, includeEnvRegex map[string]*regexp.Regexp, excludeEnvRegex map[string]*regexp.Regexp, k8sFilter *K8SFilter, ) (newCount, delCount int, matchAddedList, matchDeletedList []string) { dc.lock.RLock() defer dc.lock.RUnlock() matchDeletedList = make([]string, 0) matchAddedList = make([]string, 0) // Remove deleted containers from match list and full list. delCount = 0 for id := range fullList { if _, exist := dc.containerMap[id]; !exist { delete(fullList, id) if _, matched := matchList[id]; matched { delete(matchList, id) matchDeletedList = append(matchDeletedList, id) delCount++ } } } // Update matched container status for id := range matchList { c, ok := dc.containerMap[id] if ok { matchList[id] = c } else { logger.Warningf(context.Background(), "DOCKER_MATCH_ALARM", "matched container not in docker center") } } // Add new containers to full list and matched to match list. newCount = 0 for id, info := range dc.containerMap { if _, exist := fullList[id]; !exist { fullList[id] = true if isContainerLabelMatch(includeLabel, excludeLabel, includeLabelRegex, excludeLabelRegex, info) && isContainerEnvMatch(includeEnv, excludeEnv, includeEnvRegex, excludeEnvRegex, info) && info.K8SInfo.IsMatch(k8sFilter) { newCount++ matchList[id] = info matchAddedList = append(matchAddedList, id) } } } return newCount, delCount, matchAddedList, matchDeletedList } func (dc *DockerCenter) getDiffContainers(fullList map[string]struct{}) (fullAddedList, fullDeletedList []string) { dc.lock.RLock() defer dc.lock.RUnlock() fullDeletedList = make([]string, 0) fullAddedList = make([]string, 0) for id := range fullList { if _, exist := dc.containerMap[id]; !exist { delete(fullList, id) fullDeletedList = append(fullDeletedList, id) } } for id := range dc.containerMap { if _, exist := fullList[id]; !exist { fullList[id] = struct{}{} fullAddedList = append(fullAddedList, id) } } return fullAddedList, fullDeletedList } func (dc *DockerCenter) getAllSpecificInfo(filter func(*DockerInfoDetail) bool) (infoList []*DockerInfoDetail) { dc.lock.RLock() defer dc.lock.RUnlock() for _, info := range dc.containerMap { if filter(info) { infoList = append(infoList, info) } } return infoList } func (dc *DockerCenter) processAllContainerInfo(processor func(*DockerInfoDetail)) { dc.lock.RLock() defer dc.lock.RUnlock() for _, info := range dc.containerMap { processor(info) } } func (dc *DockerCenter) getContainerDetail(id string) (containerDetail *DockerInfoDetail, ok bool) { dc.lock.RLock() defer dc.lock.RUnlock() containerDetail, ok = dc.containerMap[id] return } func (dc *DockerCenter) getLastUpdateMapTime() int64 { return atomic.LoadInt64(&dc.lastUpdateMapTime) } func (dc *DockerCenter) refreshLastUpdateMapTime() { atomic.StoreInt64(&dc.lastUpdateMapTime, time.Now().UnixNano()) } func (dc *DockerCenter) updateContainers(containerMap map[string]*DockerInfoDetail) { dc.lock.Lock() defer dc.lock.Unlock() for key, container := range dc.containerMap { // check removed keys if _, ok := containerMap[key]; !ok { if !container.IsTimeout() { // not timeout, put to new map containerMap[key] = container } } } // switch to new container map if logger.DebugFlag() { for i, c := range containerMap { logger.Debugf(context.Background(), "Update all containers [%v]: id:%v\tname:%v\tcreated:%v\tstatus:%v detail=%+v", i, c.IDPrefix(), c.ContainerInfo.Name, c.ContainerInfo.Created, c.Status(), c.ContainerInfo) } } dc.containerMap = containerMap dc.mergeK8sInfo() dc.refreshLastUpdateMapTime() } func (dc *DockerCenter) mergeK8sInfo() { k8sInfoMap := make(map[string][]*K8SInfo) for _, container := range dc.containerMap { if container.K8SInfo == nil { continue } key := container.K8SInfo.Namespace + "@" + container.K8SInfo.Pod k8sInfoMap[key] = append(k8sInfoMap[key], container.K8SInfo) } for key, k8sInfo := range k8sInfoMap { if len(k8sInfo) < 2 { logger.Debug(context.Background(), "k8s pod's container count < 2", key) continue } // @note we need test pod with many sidecar containers for i := 1; i < len(k8sInfo); i++ { k8sInfo[0].Merge(k8sInfo[i]) } for i := 1; i < len(k8sInfo); i++ { k8sInfo[i].Merge(k8sInfo[0]) } } } func (dc *DockerCenter) updateContainer(id string, container *DockerInfoDetail) { dc.lock.Lock() defer dc.lock.Unlock() if container.K8SInfo != nil { if _, ok := dc.containerMap[id]; !ok { for _, oldContainer := range dc.containerMap { if oldContainer.K8SInfo != nil && oldContainer.K8SInfo.IsSamePod(container.K8SInfo) { oldContainer.K8SInfo.Merge(container.K8SInfo) } } } } if logger.DebugFlag() { // bytes, _ := json.Marshal(container) // logger.Debug(context.Background(), "update container info", string(bytes)) logger.Debugf(context.Background(), "Update one container: id:%v\tname:%v\tcreated:%v\tstatus:%v detail=%+v", container.IDPrefix(), container.ContainerInfo.Name, container.ContainerInfo.Created, container.Status(), container.ContainerInfo) } dc.containerMap[id] = container dc.refreshLastUpdateMapTime() } func (dc *DockerCenter) fetchAll() error { dc.containerStateLock.Lock() defer dc.containerStateLock.Unlock() containers, err := dc.client.ContainerList(context.Background(), types.ContainerListOptions{All: true}) if err != nil { dc.setLastError(err, "list container error") return err } logger.Debug(context.Background(), "fetch all", containers) var containerMap = make(map[string]*DockerInfoDetail) for _, container := range containers { var containerDetail types.ContainerJSON for idx := 0; idx < 3; idx++ { if containerDetail, err = dc.client.ContainerInspect(context.Background(), container.ID); err == nil { break } time.Sleep(time.Second * 5) } if err == nil { if !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { continue } containerMap[container.ID] = dc.CreateInfoDetail(containerDetail, envConfigPrefix, false) } else { dc.setLastError(err, "inspect container error "+container.ID) } } dc.updateContainers(containerMap) return nil } func (dc *DockerCenter) fetchOne(containerID string, tryFindSandbox bool) error { dc.containerStateLock.Lock() defer dc.containerStateLock.Unlock() containerDetail, err := dc.client.ContainerInspect(context.Background(), containerID) if err != nil { dc.setLastError(err, "inspect container error "+containerID) return err } if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { containerDetail.State.Status = ContainerStatusExited } // docker 场景下 // tryFindSandbox如果是false, 那么fetchOne的地方应该会调用两次,一次是sandbox的id,一次是业务容器的id // tryFindSandbox如果是true, 调用的地方只会有一个业务容器的id,然后依赖fetchOne内部把sandbox信息补全 dc.updateContainer(containerID, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)) logger.Debug(context.Background(), "update container", containerID, "detail", containerDetail) if tryFindSandbox && containerDetail.Config != nil { if id := containerDetail.Config.Labels["io.kubernetes.sandbox.id"]; id != "" { containerDetail, err = dc.client.ContainerInspect(context.Background(), id) if err != nil { dc.setLastError(err, "inspect sandbox container error "+id) } else { if containerDetail.State.Status == ContainerStatusRunning && !dc.containerHelper.ContainerProcessAlive(containerDetail.State.Pid) { containerDetail.State.Status = ContainerStatusExited } dc.updateContainer(id, dc.CreateInfoDetail(containerDetail, envConfigPrefix, false)) logger.Debug(context.Background(), "update sandbox container", id, "detail", containerDetail) } } } return err } // We mark container removed if it is exited or its metadata cannot be accessed // e.g. cannot docker inspect / crictl inspect it. func (dc *DockerCenter) markRemove(containerID string) { dc.lock.Lock() defer dc.lock.Unlock() if container, ok := dc.containerMap[containerID]; ok { if container.deleteFlag { return } logger.Debugf(context.Background(), "mark remove container: id:%v\tname:%v\tcreated:%v\tstatus:%v detail=%+v", container.IDPrefix(), container.ContainerInfo.Name, container.ContainerInfo.Created, container.Status(), container.ContainerInfo) container.ContainerInfo.State.Status = ContainerStatusExited container.deleteFlag = true container.lastUpdateTime = time.Now() dc.refreshLastUpdateMapTime() } } func (dc *DockerCenter) cleanTimeoutContainer() { dc.lock.Lock() defer dc.lock.Unlock() hasDelete := false for key, container := range dc.containerMap { // Comfirm to delete: // 1. The container is marked deleted for a while. // 2. The time of last success fetch all is too old. if container.IsTimeout() { logger.Debugf(context.Background(), "delete container, id:%v\tname:%v\tcreated:%v\tstatus:%v\tdetail:%+v", container.IDPrefix(), container.ContainerInfo.Name, container.ContainerInfo.Created, container.Status(), container.ContainerInfo) delete(dc.containerMap, key) hasDelete = true } } if hasDelete { dc.refreshLastUpdateMapTime() } } func (dc *DockerCenter) sweepCache() { // clear unuseful cache usedImageIDSet := make(map[string]bool) dc.lock.RLock() for _, container := range dc.containerMap { usedImageIDSet[container.ContainerInfo.Image] = true } dc.lock.RUnlock() dc.imageLock.Lock() for key := range dc.imageCache { if _, ok := usedImageIDSet[key]; !ok { delete(dc.imageCache, key) } } dc.imageLock.Unlock() } func dockerCenterRecover() { if err := recover(); err != nil { trace := make([]byte, 2048) runtime.Stack(trace, true) logger.Error(context.Background(), "PLUGIN_RUNTIME_ALARM", "docker center runtime error", err, "stack", string(trace)) } } func (dc *DockerCenter) initClient() error { var err error // do not CreateDockerClient multi times if dc.client == nil { if dc.client, err = CreateDockerClient(); err != nil { dc.setLastError(err, "init docker client from env error") return err } } return nil } func (dc *DockerCenter) eventListener() { errorCount := 0 defer dockerCenterRecover() timer := time.NewTimer(EventListenerTimeout) var err error for { logger.Info(context.Background(), "docker event listener", "start") ctx, cancel := context.WithCancel(context.Background()) events, errors := dc.client.Events(ctx, types.EventsOptions{}) breakFlag := false for !breakFlag { timer.Reset(EventListenerTimeout) select { case event, ok := <-events: if !ok { logger.Errorf(context.Background(), "DOCKER_EVENT_ALARM", "docker event listener stop") errorCount++ breakFlag = true break } logger.Debug(context.Background(), "docker event captured", event) errorCount = 0 switch event.Status { case "start", "restart": _ = dc.fetchOne(event.ID, false) case "rename": _ = dc.fetchOne(event.ID, false) case "die": dc.markRemove(event.ID) default: } dc.eventChanLock.Lock() if dc.eventChan != nil { // no block insert select { case dc.eventChan <- event: default: logger.Error(context.Background(), "DOCKER_EVENT_ALARM", "event queue is full, miss event", event) } } dc.eventChanLock.Unlock() case err = <-errors: logger.Error(context.Background(), "DOCKER_EVENT_ALARM", "docker event listener error", err) breakFlag = true case <-timer.C: logger.Errorf(context.Background(), "DOCKER_EVENT_ALARM", "no docker event in 1 hour. Reset event listener") breakFlag = true } } cancel() if errorCount > 10 && criRuntimeWrapper != nil { logger.Info(context.Background(), "docker listener fails and cri runtime wrapper is valid", "stop docker listener") break } // if always error, sleep 300 secs if errorCount > 30 { time.Sleep(time.Duration(300) * time.Second) } else { time.Sleep(time.Duration(10) * time.Second) } } dc.setLastError(err, "docker event stream closed") }