plugins/input/docker/stdout/input_docker_stdout.go (367 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package stdout import ( "fmt" "regexp" "sync" "time" "github.com/docker/docker/api/types" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/selfmonitor" "github.com/alibaba/ilogtail/pkg/util" "github.com/alibaba/ilogtail/plugins/input" ) const serviceDockerStdoutKey = "service_docker_stdout_v2" func logDriverSupported(container types.ContainerJSON) bool { // containerd has no hostConfig, return true if container.HostConfig == nil { return true } switch container.HostConfig.LogConfig.Type { case "json-file": return true default: return false } } func logPathEmpty(container types.ContainerJSON) bool { return len(container.LogPath) == 0 } type DockerFileSyner struct { dockerFileReader *helper.LogFileReader dockerFileProcessor *DockerStdoutProcessor info *helper.DockerInfoDetail } func NewDockerFileSynerByFile(sds *ServiceDockerStdout, filePath string) *DockerFileSyner { dockerInfoDetail := &helper.DockerInfoDetail{} dockerInfoDetail.ContainerInfo = types.ContainerJSON{} dockerInfoDetail.ContainerInfo.LogPath = filePath sds.LogtailInDocker = false sds.StartLogMaxOffset = 10 * 1024 * 1024 * 1024 return NewDockerFileSyner(sds, dockerInfoDetail, sds.checkpointMap) } func NewDockerFileSyner(sds *ServiceDockerStdout, info *helper.DockerInfoDetail, checkpointMap map[string]helper.LogFileReaderCheckPoint) *DockerFileSyner { var reg *regexp.Regexp var err error if len(sds.BeginLineRegex) > 0 { if reg, err = regexp.Compile(sds.BeginLineRegex); err != nil { logger.Warning(sds.context.GetRuntimeContext(), "DOCKER_REGEX_COMPILE_ALARM", "compile begin line regex error, regex", sds.BeginLineRegex, "error", err) } } source := util.NewPackIDPrefix(info.ContainerInfo.ID + sds.context.GetConfigName()) tags := info.GetExternalTags(sds.ExternalEnvTag, sds.ExternalK8sLabelTag) for k, v := range info.ContainerNameTag { tags[k] = v } processor := NewDockerStdoutProcessor(reg, time.Duration(sds.BeginLineTimeoutMs)*time.Millisecond, sds.BeginLineCheckLength, sds.MaxLogSize, sds.Stdout, sds.Stderr, sds.context, sds.collector, tags, source) checkpoint, ok := checkpointMap[info.ContainerInfo.ID] if !ok { if sds.LogtailInDocker { checkpoint.Path = helper.GetMountedFilePath(info.ContainerInfo.LogPath) } else { checkpoint.Path = info.ContainerInfo.LogPath } // first watch this container realPath, stat := helper.TryGetRealPath(checkpoint.Path) if realPath == "" { logger.Warning(sds.context.GetRuntimeContext(), "DOCKER_STDOUT_STAT_ALARM", "stat log file error, path", checkpoint.Path, "error", "path not found") } else { checkpoint.Offset = stat.Size() if checkpoint.Offset > sds.StartLogMaxOffset { logger.Warning(sds.context.GetRuntimeContext(), "DOCKER_STDOUT_START_ALARM", "log file too big, path", checkpoint.Path, "offset", checkpoint.Offset) checkpoint.Offset -= sds.StartLogMaxOffset } else { checkpoint.Offset = 0 } checkpoint.State = helper.GetOSState(stat) checkpoint.Path = realPath } } if sds.CloseUnChangedSec < 10 { sds.CloseUnChangedSec = 10 } logger.Info(sds.context.GetRuntimeContext(), "new stdout reader id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status(), "checkpoint_logpath", checkpoint.Path, "in_docker", sds.LogtailInDocker) config := helper.LogFileReaderConfig{ ReadIntervalMs: sds.ReadIntervalMs, MaxReadBlockSize: sds.MaxLogSize, CloseFileSec: sds.CloseUnChangedSec, Tracker: sds.tracker, } reader, _ := helper.NewLogFileReader(sds.context.GetRuntimeContext(), checkpoint, config, processor) return &DockerFileSyner{ dockerFileReader: reader, info: info, dockerFileProcessor: processor, } } type ServiceDockerStdout struct { IncludeLabel map[string]string `comment:"include container label for selector. [Deprecated: use IncludeContainerLabel and IncludeK8sLabel instead]"` ExcludeLabel map[string]string `comment:"exclude container label for selector. [Deprecated: use ExcludeContainerLabel and ExcludeK8sLabel instead]"` IncludeEnv map[string]string `comment:"the container would be selected when it is matched by any environment rules. Furthermore, the regular expression starts with '^' is supported as the env value, such as 'ENVA:^DE.*$'' would hit all containers having any envs starts with DE."` ExcludeEnv map[string]string `comment:"the container would be excluded when it is matched by any environment rules. Furthermore, the regular expression starts with '^' is supported as the env value, such as 'ENVA:^DE.*$'' would hit all containers having any envs starts with DE."` IncludeContainerLabel map[string]string `comment:"the container would be selected when it is matched by any container labels. Furthermore, the regular expression starts with '^' is supported as the label value, such as 'LABEL:^DE.*$'' would hit all containers having any labels starts with DE."` ExcludeContainerLabel map[string]string `comment:"the container would be excluded when it is matched by any container labels. Furthermore, the regular expression starts with '^' is supported as the label value, such as 'LABEL:^DE.*$'' would hit all containers having any labels starts with DE."` IncludeK8sLabel map[string]string `comment:"the container of pod would be selected when it is matched by any include k8s label rules. Furthermore, the regular expression starts with '^' is supported as the value to match pods."` ExcludeK8sLabel map[string]string `comment:"the container of pod would be excluded when it is matched by any exclude k8s label rules. Furthermore, the regular expression starts with '^' is supported as the value to exclude pods."` ExternalEnvTag map[string]string `comment:"extract the env value as the log tags for one container, such as the value of ENVA would be appended to the 'taga' of log tags when configured 'ENVA:taga' pair."` ExternalK8sLabelTag map[string]string `comment:"extract the pod label value as the log tags for one container, such as the value of LABELA would be appended to the 'taga' of log tags when configured 'LABELA:taga' pair."` FlushIntervalMs int `comment:"the interval of container discovery, and the timeunit is millisecond. Default value is 3000."` ReadIntervalMs int `comment:"the interval of read stdout log, and the timeunit is millisecond. Default value is 1000."` SaveCheckPointSec int `comment:"the interval of save checkpoint, and the timeunit is second. Default value is 60."` BeginLineRegex string `comment:"the regular expression of begin line for the multi line log."` BeginLineTimeoutMs int `comment:"the maximum timeout milliseconds for begin line match. Default value is 3000."` BeginLineCheckLength int `comment:"the prefix length of log line to match the first line. Default value is 10240."` MaxLogSize int `comment:"the maximum log size. Default value is 512*1024, a.k.a 512K."` CloseUnChangedSec int `comment:"the reading file would be close when the interval between last read operation is over {CloseUnChangedSec} seconds. Default value is 60."` StartLogMaxOffset int64 `comment:"the first read operation would read {StartLogMaxOffset} size history logs. Default value is 128*1024, a.k.a 128K."` Stdout bool `comment:"collect stdout log. Default is true."` Stderr bool `comment:"collect stderr log. Default is true."` LogtailInDocker bool `comment:"the logtail running mode. Default is true."` K8sNamespaceRegex string `comment:"the regular expression of kubernetes namespace to match containers."` K8sPodRegex string `comment:"the regular expression of kubernetes pod to match containers."` K8sContainerRegex string `comment:"the regular expression of kubernetes container to match containers."` // export from ilogtail-trace component IncludeLabelRegex map[string]*regexp.Regexp ExcludeLabelRegex map[string]*regexp.Regexp IncludeEnvRegex map[string]*regexp.Regexp ExcludeEnvRegex map[string]*regexp.Regexp K8sFilter *helper.K8SFilter // for tracker tracker *helper.ReaderMetricTracker avgInstanceMetric selfmonitor.CounterMetric addMetric selfmonitor.CounterMetric deleteMetric selfmonitor.CounterMetric synerMap map[string]*DockerFileSyner checkpointMap map[string]helper.LogFileReaderCheckPoint shutdown chan struct { } waitGroup sync.WaitGroup context pipeline.Context collector pipeline.Collector // Last return of GetAllAcceptedInfoV2 fullList map[string]bool matchList map[string]*helper.DockerInfoDetail lastUpdateTime int64 CollectContainersFlag bool } func (sds *ServiceDockerStdout) Init(context pipeline.Context) (int, error) { sds.context = context helper.ContainerCenterInit() sds.fullList = make(map[string]bool) sds.matchList = make(map[string]*helper.DockerInfoDetail) sds.synerMap = make(map[string]*DockerFileSyner) if sds.MaxLogSize < 1024 { sds.MaxLogSize = 1024 } if sds.MaxLogSize > 1024*1024*20 { sds.MaxLogSize = 1024 * 1024 * 20 } metricsRecord := sds.context.GetMetricRecord() sds.tracker = helper.NewReaderMetricTracker(metricsRecord) sds.avgInstanceMetric = selfmonitor.NewAverageMetricAndRegister(metricsRecord, selfmonitor.MetricPluginContainerTotal) sds.addMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginAddContainerTotal) sds.deleteMetric = selfmonitor.NewCounterMetricAndRegister(metricsRecord, selfmonitor.MetricPluginRemoveContainerTotal) var err error sds.IncludeEnv, sds.IncludeEnvRegex, err = helper.SplitRegexFromMap(sds.IncludeEnv) if err != nil { logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init include env regex error", err) } sds.ExcludeEnv, sds.ExcludeEnvRegex, err = helper.SplitRegexFromMap(sds.ExcludeEnv) if err != nil { logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init exclude env regex error", err) } if sds.IncludeLabel != nil { for k, v := range sds.IncludeContainerLabel { sds.IncludeLabel[k] = v } } else { sds.IncludeLabel = sds.IncludeContainerLabel } if sds.ExcludeLabel != nil { for k, v := range sds.ExcludeContainerLabel { sds.ExcludeLabel[k] = v } } else { sds.ExcludeLabel = sds.ExcludeContainerLabel } sds.IncludeLabel, sds.IncludeLabelRegex, err = helper.SplitRegexFromMap(sds.IncludeLabel) if err != nil { logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init include label regex error", err) } sds.ExcludeLabel, sds.ExcludeLabelRegex, err = helper.SplitRegexFromMap(sds.ExcludeLabel) if err != nil { logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init exclude label regex error", err) } sds.K8sFilter, err = helper.CreateK8SFilter(sds.K8sNamespaceRegex, sds.K8sPodRegex, sds.K8sContainerRegex, sds.IncludeK8sLabel, sds.ExcludeK8sLabel) return 0, err } func (sds *ServiceDockerStdout) Description() string { return "the container stdout input plugin for iLogtail, which supports docker and containerd." } func (sds *ServiceDockerStdout) Collect(pipeline.Collector) error { return nil } func (sds *ServiceDockerStdout) FlushAll(c pipeline.Collector, firstStart bool) error { newUpdateTime := helper.GetContainersLastUpdateTime() if sds.lastUpdateTime != 0 { if sds.lastUpdateTime >= newUpdateTime { return nil } } var err error newCount, delCount, addResultList, deleteResultList := helper.GetContainerByAcceptedInfoV2( sds.fullList, sds.matchList, sds.IncludeLabel, sds.ExcludeLabel, sds.IncludeLabelRegex, sds.ExcludeLabelRegex, sds.IncludeEnv, sds.ExcludeEnv, sds.IncludeEnvRegex, sds.ExcludeEnvRegex, sds.K8sFilter) sds.lastUpdateTime = newUpdateTime if sds.CollectContainersFlag { // record config result { keys := make([]string, 0, len(sds.matchList)) for k := range sds.matchList { if len(k) > 0 { keys = append(keys, helper.GetShortID(k)) } } configResult := &helper.ContainerConfigResult{ DataType: "container_config_result", Project: sds.context.GetProject(), Logstore: sds.context.GetLogstore(), ConfigName: sds.context.GetConfigName(), PathExistInputContainerIDs: helper.GetStringFromList(keys), SourceAddress: "stdout", InputType: input.ServiceDockerStdoutPluginName, FlusherType: "flusher_sls", FlusherTargetAddress: fmt.Sprintf("%s/%s", sds.context.GetProject(), sds.context.GetLogstore()), } helper.RecordContainerConfigResultMap(configResult) if newCount != 0 || delCount != 0 || firstStart { helper.RecordContainerConfigResultIncrement(configResult) } logger.Debugf(sds.context.GetRuntimeContext(), "update match list, addResultList: %v, deleteResultList: %v", addResultList, deleteResultList) } } if !firstStart && newCount == 0 && delCount == 0 { logger.Debugf(sds.context.GetRuntimeContext(), "update match list, firstStart: %v, new: %v, delete: %v", firstStart, newCount, delCount) return nil } logger.Infof(sds.context.GetRuntimeContext(), "update match list, firstStart: %v, new: %v, delete: %v", firstStart, newCount, delCount) dockerInfos := sds.matchList logger.Debug(sds.context.GetRuntimeContext(), "match list length", len(dockerInfos)) sds.avgInstanceMetric.Add(int64(len(dockerInfos))) for id, info := range dockerInfos { if !logDriverSupported(info.ContainerInfo) { continue } if logPathEmpty(info.ContainerInfo) { continue } if _, ok := sds.synerMap[id]; !ok || firstStart { syner := NewDockerFileSyner(sds, info, sds.checkpointMap) logger.Info(sds.context.GetRuntimeContext(), "docker stdout", "added", "source host path", info.ContainerInfo.LogPath, "id", info.IDPrefix(), "name", info.ContainerInfo.Name, "created", info.ContainerInfo.Created, "status", info.Status()) sds.addMetric.Add(1) sds.synerMap[id] = syner syner.dockerFileReader.Start() } } // delete container for id, syner := range sds.synerMap { if _, ok := dockerInfos[id]; !ok { logger.Info(sds.context.GetRuntimeContext(), "docker stdout", "deleted", "id", helper.GetShortID(id), "name", syner.info.ContainerInfo.Name) syner.dockerFileReader.Stop() delete(sds.synerMap, id) sds.deleteMetric.Add(1) } } return err } func (sds *ServiceDockerStdout) SaveCheckPoint(force bool) error { checkpointChanged := false for id, syner := range sds.synerMap { checkpoint, changed := syner.dockerFileReader.GetCheckpoint() if changed { checkpointChanged = true } sds.checkpointMap[id] = checkpoint } if !force && !checkpointChanged { logger.Debug(sds.context.GetRuntimeContext(), "no need to save checkpoint, checkpoint size", len(sds.checkpointMap)) return nil } logger.Debug(sds.context.GetRuntimeContext(), "save checkpoint, checkpoint size", len(sds.checkpointMap)) return sds.context.SaveCheckPointObject(serviceDockerStdoutKey, sds.checkpointMap) } func (sds *ServiceDockerStdout) LoadCheckPoint() { if sds.checkpointMap != nil { return } sds.checkpointMap = make(map[string]helper.LogFileReaderCheckPoint) sds.context.GetCheckPointObject(serviceDockerStdoutKey, &sds.checkpointMap) } func (sds *ServiceDockerStdout) ClearUselessCheckpoint() { if sds.checkpointMap == nil { return } for id := range sds.checkpointMap { if _, ok := sds.synerMap[id]; !ok { logger.Info(sds.context.GetRuntimeContext(), "delete checkpoint, id", id) delete(sds.checkpointMap, id) } } } // Start starts the ServiceInput's service, whatever that may be func (sds *ServiceDockerStdout) Start(c pipeline.Collector) error { sds.collector = c sds.shutdown = make(chan struct{}) sds.waitGroup.Add(1) defer sds.waitGroup.Done() sds.LoadCheckPoint() lastSaveCheckPointTime := time.Now() _ = sds.FlushAll(c, true) for { timer := time.NewTimer(time.Duration(sds.FlushIntervalMs) * time.Millisecond) select { case <-sds.shutdown: logger.Info(sds.context.GetRuntimeContext(), "docker stdout main runtime stop", "begin") for _, syner := range sds.synerMap { syner.dockerFileReader.Stop() } logger.Info(sds.context.GetRuntimeContext(), "docker stdout main runtime stop", "success") return nil case <-timer.C: if nowTime := time.Now(); nowTime.Sub(lastSaveCheckPointTime) > time.Second*time.Duration(sds.SaveCheckPointSec) { _ = sds.SaveCheckPoint(false) lastSaveCheckPointTime = nowTime sds.ClearUselessCheckpoint() } _ = sds.FlushAll(c, false) } } } // Stop stops the services and closes any necessary channels and connections func (sds *ServiceDockerStdout) Stop() error { close(sds.shutdown) sds.waitGroup.Wait() // force save checkpoint _ = sds.SaveCheckPoint(true) return nil } func init() { pipeline.ServiceInputs[input.ServiceDockerStdoutPluginName] = func() pipeline.ServiceInput { return &ServiceDockerStdout{ FlushIntervalMs: 3000, SaveCheckPointSec: 60, ReadIntervalMs: 1000, Stdout: true, Stderr: true, BeginLineTimeoutMs: 3000, LogtailInDocker: true, CloseUnChangedSec: 60, BeginLineCheckLength: 10 * 1024, MaxLogSize: 512 * 1024, StartLogMaxOffset: 128 * 1024, } } }