plugins/input/docker/logmeta/metric_container_info.go (424 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. //go:build linux || windows // +build linux windows package logmeta import ( "encoding/json" "fmt" "os" "path/filepath" "reflect" "regexp" "strings" "time" "github.com/docker/docker/api/types" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/logtail" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/selfmonitor" "github.com/alibaba/ilogtail/pkg/util" ) const ( PluginDockerUpdateFile = 1 PluginDockerDeleteFile = 2 PluginDockerUpdateFileAll = 3 PluginDockerStopFile = 4 ) type Mount struct { Source string Destination string } type DockerFileUpdateCmd struct { ID string Tags []string // 用户自定义Tag MetaDatas []string // 容器信息 Mounts []Mount // 容器挂载路径 UpperDir string // 容器默认路径 LogPath string // 标准输出路径 } type ContainerInfoCache struct { Mounts []types.MountPoint UpperDir string LogPath string } type DockerFileUpdateCmdAll struct { AllCmd []DockerFileUpdateCmd } type InputDockerFile struct { IncludeLabel map[string]string // Deprecated: use IncludeContainerLabel and IncludeK8sLabel instead. ExcludeLabel map[string]string // Deprecated: use ExcludeContainerLabel and ExcludeK8sLabel instead. IncludeEnv map[string]string ExcludeEnv map[string]string IncludeContainerLabel map[string]string ExcludeContainerLabel map[string]string IncludeK8sLabel map[string]string ExcludeK8sLabel map[string]string ExternalEnvTag map[string]string ExternalK8sLabelTag map[string]string LogPath string FilePattern string MountPath string HostFlag bool K8sNamespaceRegex string K8sPodRegex string K8sContainerRegex string // export from ilogtail-trace component IncludeLabelRegex map[string]*regexp.Regexp ExcludeLabelRegex map[string]*regexp.Regexp IncludeEnvRegex map[string]*regexp.Regexp ExcludeEnvRegex map[string]*regexp.Regexp K8sFilter *helper.K8SFilter lastContainerInfoCache map[string]ContainerInfoCache FlushIntervalMs int `comment:"the interval of container discovery, and the timeunit is millisecond. Default value is 3000."` context pipeline.Context lastClearTime time.Time updateEmptyFlag bool avgInstanceMetric selfmonitor.CounterMetric addMetric selfmonitor.CounterMetric updateMetric selfmonitor.CounterMetric deleteMetric selfmonitor.CounterMetric lastUpdateTime int64 // Last return of GetAllAcceptedInfoV2 fullList map[string]bool matchList map[string]*helper.DockerInfoDetail CollectingContainersMeta bool firstStart bool forceReleaseStopContainerFile bool } func formatPath(path string) string { if len(path) == 0 { return path } path = filepath.Clean(path) if path[len(path)-1] == '/' { return path[0 : len(path)-1] } if path[len(path)-1] == '\\' { return path[0 : len(path)-1] } return path } func (idf *InputDockerFile) Name() string { return "InputDockerFile" } func (idf *InputDockerFile) Init(context pipeline.Context) (int, error) { idf.context = context idf.forceReleaseStopContainerFile = os.Getenv("FORCE_RELEASE_STOP_CONTAINER_FILE") == "true" idf.lastContainerInfoCache = make(map[string]ContainerInfoCache) idf.firstStart = true idf.fullList = make(map[string]bool) idf.matchList = make(map[string]*helper.DockerInfoDetail) // Because docker on Windows will convert all mounted path to lowercase (see // Mounts field in output of docker inspect), so we have to change LogPath to // lowercase if it is a Windows path (with colon). idf.LogPath = formatPath(idf.LogPath) if colonPos := strings.Index(idf.LogPath, ":"); colonPos != -1 { idf.LogPath = strings.ToLower(idf.LogPath) } idf.lastClearTime = time.Now() helper.ContainerCenterInit() if idf.HostFlag { idf.MountPath = "" } else if envPath := os.Getenv("ALIYUN_LOGTAIL_MOUNT_PATH"); len(envPath) > 0 { if envPath[len(envPath)-1] == '/' || envPath[len(envPath)-1] == '\\' { envPath = envPath[0 : len(envPath)-1] } idf.MountPath = envPath } else { idf.MountPath = helper.DefaultLogtailMountPath } idf.updateEmptyFlag = true metricsRecord := idf.context.GetMetricRecord() idf.avgInstanceMetric = selfmonitor.NewAverageMetricAndRegister(metricsRecord, selfmonitor.MetricPluginContainerTotal) idf.addMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginAddContainerTotal) idf.deleteMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginRemoveContainerTotal) idf.updateMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginUpdateContainerTotal) var err error idf.IncludeEnv, idf.IncludeEnvRegex, err = helper.SplitRegexFromMap(idf.IncludeEnv) if err != nil { logger.Warning(idf.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init include env regex error", err) } idf.ExcludeEnv, idf.ExcludeEnvRegex, err = helper.SplitRegexFromMap(idf.ExcludeEnv) if err != nil { logger.Warning(idf.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init exclude env regex error", err) } if idf.IncludeLabel != nil { for k, v := range idf.IncludeContainerLabel { idf.IncludeLabel[k] = v } } else { idf.IncludeLabel = idf.IncludeContainerLabel } if idf.ExcludeLabel != nil { for k, v := range idf.ExcludeContainerLabel { idf.ExcludeLabel[k] = v } } else { idf.ExcludeLabel = idf.ExcludeContainerLabel } idf.IncludeLabel, idf.IncludeLabelRegex, err = helper.SplitRegexFromMap(idf.IncludeLabel) if err != nil { logger.Warning(idf.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init include label regex error", err) } idf.ExcludeLabel, idf.ExcludeLabelRegex, err = helper.SplitRegexFromMap(idf.ExcludeLabel) if err != nil { logger.Warning(idf.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init exclude label regex error", err) } idf.K8sFilter, err = helper.CreateK8SFilter(idf.K8sNamespaceRegex, idf.K8sPodRegex, idf.K8sContainerRegex, idf.IncludeK8sLabel, idf.ExcludeK8sLabel) logger.Debugf(idf.context.GetRuntimeContext(), "InputDockerFile inited successfully") return idf.FlushIntervalMs, err } func (idf *InputDockerFile) Description() string { return "docker file plugin for logtail" } // addMappingToLogtail 添加容器信息到allCmd里面,allCmd不为nil时,只添加不执行,allCmd为nil时,添加并执行 func (idf *InputDockerFile) addMappingToLogtail(info *helper.DockerInfoDetail, containerInfo ContainerInfoCache, allCmd *DockerFileUpdateCmdAll) { var cmd DockerFileUpdateCmd cmd.ID = info.ContainerInfo.ID cmd.UpperDir = filepath.Clean(containerInfo.UpperDir) cmd.LogPath = filepath.Clean(containerInfo.LogPath) // tags tags := info.GetExternalTags(idf.ExternalEnvTag, idf.ExternalK8sLabelTag) cmd.Tags = make([]string, 0, len(tags)*2) for key, val := range tags { cmd.Tags = append(cmd.Tags, key) cmd.Tags = append(cmd.Tags, val) } // info.ContainerNameTag cmd.MetaDatas = make([]string, 0, len(info.ContainerNameTag)*2) for key, val := range info.ContainerNameTag { cmd.MetaDatas = append(cmd.MetaDatas, key) cmd.MetaDatas = append(cmd.MetaDatas, val) } cmd.Mounts = make([]Mount, 0, len(containerInfo.Mounts)) for _, mount := range containerInfo.Mounts { cmd.Mounts = append(cmd.Mounts, Mount{ Source: filepath.Clean(mount.Source), Destination: filepath.Clean(mount.Destination), }) } cmdBuf, _ := json.Marshal(&cmd) configName := idf.context.GetConfigName() logger.Info(idf.context.GetRuntimeContext(), "addMappingToLogtail cmd", cmd) if allCmd != nil { allCmd.AllCmd = append(allCmd.AllCmd, cmd) return } if err := logtail.ExecuteCMD(configName, PluginDockerUpdateFile, cmdBuf); err != nil { logger.Error(idf.context.GetRuntimeContext(), "DOCKER_FILE_MAPPING_ALARM", "cmdType", PluginDockerUpdateFile, "cmd", cmdBuf, "error", err) } } // deleteMappingFromLogtail 把需要删除的容器信息的id 发送给c++ func (idf *InputDockerFile) deleteMappingFromLogtail(id string) { var cmd DockerFileUpdateCmd cmd.ID = id logger.Info(idf.context.GetRuntimeContext(), "deleteMappingFromLogtail cmd", cmd) cmdBuf, _ := json.Marshal(&cmd) configName := idf.context.GetConfigName() if err := logtail.ExecuteCMD(configName, PluginDockerDeleteFile, cmdBuf); err != nil { logger.Error(idf.context.GetRuntimeContext(), "DOCKER_FILE_MAPPING_ALARM", "cmdType", PluginDockerDeleteFile, "cmd", cmdBuf, "error", err) } } // notifyStopToLogtail 通知c++ 该容器已经停止 func (idf *InputDockerFile) notifyStopToLogtail(id string) { var cmd DockerFileUpdateCmd cmd.ID = id logger.Info(idf.context.GetRuntimeContext(), "notifyStopToLogtail cmd", cmd) cmdBuf, _ := json.Marshal(&cmd) configName := idf.context.GetConfigName() if err := logtail.ExecuteCMD(configName, PluginDockerStopFile, cmdBuf); err != nil { logger.Error(idf.context.GetRuntimeContext(), "DOCKER_FILE_MAPPING_ALARM", "cmdType", PluginDockerStopFile, "cmd", cmdBuf, "error", err) } } // updateAll 更新所有容器信息 func (idf *InputDockerFile) updateAll(allCmd *DockerFileUpdateCmdAll) { logger.Info(idf.context.GetRuntimeContext(), "update all", len(allCmd.AllCmd)) cmdBuf, _ := json.Marshal(allCmd) configName := idf.context.GetConfigName() if err := logtail.ExecuteCMD(configName, PluginDockerUpdateFileAll, cmdBuf); err != nil { logger.Error(idf.context.GetRuntimeContext(), "DOCKER_FILE_MAPPING_ALARM", "cmdType", PluginDockerUpdateFileAll, "cmd", cmdBuf, "error", err) } } func (idf *InputDockerFile) updateMapping(info *helper.DockerInfoDetail, allCmd *DockerFileUpdateCmdAll) { logPath := filepath.Clean(info.StdoutPath) id := info.ContainerInfo.ID mounts := info.ContainerInfo.Mounts upperDir := info.DefaultRootPath changed := false // logPath if val, ok := idf.lastContainerInfoCache[id]; ok && val.LogPath != logPath { // send delete first and then add this info logger.Info(idf.context.GetRuntimeContext(), "container logPath", "changed", "last", val, "logPath", logPath, "id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status()) changed = true } else if !ok { logger.Info(idf.context.GetRuntimeContext(), "container logPath", "added", "logPath", logPath, "id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status()) changed = true } // upperDir if !changed { if val, ok := idf.lastContainerInfoCache[id]; ok && val.UpperDir != upperDir { // send delete first and then add this info logger.Info(idf.context.GetRuntimeContext(), "container upperDir", "changed", "last", val, "upperDir", upperDir, "id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status()) changed = true } else if !ok { logger.Info(idf.context.GetRuntimeContext(), "container upperDir", "added", "upperDir", upperDir, "id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status()) changed = true } } // 判断mounts if !changed { if val, ok := idf.lastContainerInfoCache[id]; ok && !reflect.DeepEqual(val.Mounts, mounts) { // send delete first and then add this info logger.Info(idf.context.GetRuntimeContext(), "container mounts", "changed", "last", val, "mounts", mounts, "id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status()) changed = true } else if !ok { logger.Info(idf.context.GetRuntimeContext(), "container mounts", "added", "mounts", mounts, "id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status()) changed = true } } if changed { idf.updateMetric.Add(1) newContainerInfoCache := ContainerInfoCache{ Mounts: mounts, UpperDir: upperDir, LogPath: logPath, } idf.lastContainerInfoCache[id] = newContainerInfoCache idf.addMappingToLogtail(info, newContainerInfoCache, allCmd) } } // deleteMapping 删除容器信息 func (idf *InputDockerFile) deleteMapping(id string) { idf.deleteMappingFromLogtail(id) logger.Info(idf.context.GetRuntimeContext(), "container mapping", "deleted", "id", helper.GetShortID(id), "logPath", idf.lastContainerInfoCache[id].LogPath, "upperDir", idf.lastContainerInfoCache[id].UpperDir, "mounts", idf.lastContainerInfoCache[id].Mounts) delete(idf.lastContainerInfoCache, id) } // notifyStop 通知容器停止 func (idf *InputDockerFile) notifyStop(id string) { idf.notifyStopToLogtail(id) logger.Info(idf.context.GetRuntimeContext(), "container mapping", "stopped", "id", helper.GetShortID(id), "logPath", idf.lastContainerInfoCache[id].LogPath, "upperDir", idf.lastContainerInfoCache[id].UpperDir, "mounts", idf.lastContainerInfoCache[id].Mounts) } func (idf *InputDockerFile) Collect(collector pipeline.Collector) error { newUpdateTime := helper.GetContainersLastUpdateTime() if idf.lastUpdateTime != 0 { // Nothing update, just skip. if idf.lastUpdateTime >= newUpdateTime { return nil } } var allCmd *DockerFileUpdateCmdAll allCmd = nil // if cache is empty, use update all cmd if len(idf.lastContainerInfoCache) == 0 { allCmd = new(DockerFileUpdateCmdAll) } newCount, delCount, addResultList, deleteResultList := helper.GetContainerByAcceptedInfoV2( idf.fullList, idf.matchList, idf.IncludeLabel, idf.ExcludeLabel, idf.IncludeLabelRegex, idf.ExcludeLabelRegex, idf.IncludeEnv, idf.ExcludeEnv, idf.IncludeEnvRegex, idf.ExcludeEnvRegex, idf.K8sFilter) idf.lastUpdateTime = newUpdateTime // record config result havingPathkeys := make([]string, 0) nothavingPathkeys := make([]string, 0) if newCount != 0 || delCount != 0 { logger.Infof(idf.context.GetRuntimeContext(), "update match list, new: %v, delete: %v", newCount, delCount) // Can not return here because we should notify empty update to clear // cache in docker_path_helper.json. } else { logger.Debugf(idf.context.GetRuntimeContext(), "update match list, new: %v, delete: %v", newCount, delCount) } dockerInfoDetails := idf.matchList logger.Debug(idf.context.GetRuntimeContext(), "match list length", len(dockerInfoDetails)) idf.avgInstanceMetric.Add(int64(len(dockerInfoDetails))) for k, info := range dockerInfoDetails { if len(idf.LogPath) > 0 && info.ContainerInfo.State.Status == helper.ContainerStatusRunning { // inputFile idf.updateMapping(info, allCmd) } else if len(idf.LogPath) == 0 { // stdout idf.updateMapping(info, allCmd) } // 容器元信息预览使用 if idf.CollectingContainersMeta && len(idf.LogPath) > 0 { sourcePath, containerPath := info.FindBestMatchedPath(idf.LogPath) formatSourcePath := formatPath(sourcePath) formateContainerPath := formatPath(containerPath) destPath := helper.GetMountedFilePathWithBasePath(idf.MountPath, formatSourcePath) + idf.LogPath[len(formateContainerPath):] if ok, err := util.PathExists(destPath); err == nil { if !ok { nothavingPathkeys = append(nothavingPathkeys, helper.GetShortID(k)) } else { havingPathkeys = append(havingPathkeys, helper.GetShortID(k)) } } else { nothavingPathkeys = append(nothavingPathkeys, helper.GetShortID(k)) } } } if idf.CollectingContainersMeta { var configResult *helper.ContainerConfigResult if len(idf.LogPath) == 0 { keys := make([]string, 0, len(idf.matchList)) for k := range idf.matchList { if len(k) > 0 { keys = append(keys, helper.GetShortID(k)) } } configResult = &helper.ContainerConfigResult{ DataType: "container_config_result", Project: idf.context.GetProject(), Logstore: idf.context.GetLogstore(), ConfigName: idf.context.GetConfigName(), PathExistInputContainerIDs: helper.GetStringFromList(keys), SourceAddress: "stdout", InputType: "input_container_stdio", FlusherType: "flusher_sls", FlusherTargetAddress: fmt.Sprintf("%s/%s", idf.context.GetProject(), idf.context.GetLogstore()), } } else { configResult = &helper.ContainerConfigResult{ DataType: "container_config_result", Project: idf.context.GetProject(), Logstore: idf.context.GetLogstore(), ConfigName: idf.context.GetConfigName(), SourceAddress: fmt.Sprintf("%s/**/%s", idf.LogPath, idf.FilePattern), PathExistInputContainerIDs: helper.GetStringFromList(havingPathkeys), PathNotExistInputContainerIDs: helper.GetStringFromList(nothavingPathkeys), InputType: "file_log", InputIsContainerFile: "true", FlusherType: "flusher_sls", FlusherTargetAddress: fmt.Sprintf("%s/%s", idf.context.GetProject(), idf.context.GetLogstore()), } } helper.RecordContainerConfigResultMap(configResult) if newCount != 0 || delCount != 0 || idf.firstStart { helper.RecordContainerConfigResultIncrement(configResult) idf.firstStart = false } logger.Debugf(idf.context.GetRuntimeContext(), "update match list, addResultList: %v, deleteResultList: %v", addResultList, deleteResultList) } for id := range idf.lastContainerInfoCache { if c, ok := dockerInfoDetails[id]; !ok { idf.deleteMetric.Add(1) idf.notifyStop(id) idf.deleteMapping(id) } else if c.Status() != helper.ContainerStatusRunning && len(idf.LogPath) > 0 { // input_file时会触发 if idf.forceReleaseStopContainerFile { idf.deleteMetric.Add(1) idf.notifyStop(id) idf.deleteMapping(id) } else { idf.notifyStop(id) } } } if allCmd != nil { if len(allCmd.AllCmd) == 0 { // only update empty if empty flag is true if idf.updateEmptyFlag { idf.updateAll(allCmd) idf.updateEmptyFlag = false } } else { idf.updateAll(allCmd) idf.updateEmptyFlag = true } } if time.Since(idf.lastClearTime) > time.Hour { idf.lastContainerInfoCache = make(map[string]ContainerInfoCache) idf.lastClearTime = time.Now() } return nil } func init() { pipeline.MetricInputs["metric_container_info"] = func() pipeline.MetricInput { return &InputDockerFile{ FlushIntervalMs: 3000, } } }