plugins/input/docker/rawstdout/input_docker_stdout.go (490 lines of code) (raw):

// Copyright 2021 iLogtail Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package rawstdout import ( "bufio" "context" "io" "regexp" "strings" "sync" "time" "github.com/docker/docker/api/types" docker "github.com/docker/docker/client" "github.com/docker/docker/pkg/stdcopy" "github.com/alibaba/ilogtail/pkg/helper" "github.com/alibaba/ilogtail/pkg/logger" "github.com/alibaba/ilogtail/pkg/pipeline" "github.com/alibaba/ilogtail/pkg/util" ) func logDriverSupported(container types.ContainerJSON) bool { // containerd has no hostConfig, return true if container.HostConfig == nil { return true } switch container.HostConfig.LogConfig.Type { case "json-file", "journald": return true default: return false } } type StdoutCheckPoint struct { lock sync.Mutex checkpointMap map[string]string } func (sc *StdoutCheckPoint) DeleteCheckPoint(id string) { sc.lock.Lock() defer sc.lock.Unlock() delete(sc.checkpointMap, id) } func (sc *StdoutCheckPoint) SaveCheckPoint(id, checkpoint string) { sc.lock.Lock() defer sc.lock.Unlock() sc.checkpointMap[id] = checkpoint } func (sc *StdoutCheckPoint) GetAllCheckPoint() map[string]string { rst := make(map[string]string) sc.lock.Lock() defer sc.lock.Unlock() for key, val := range sc.checkpointMap { rst[key] = val } return rst } func (sc *StdoutCheckPoint) GetCheckPoint(id string) string { sc.lock.Lock() defer sc.lock.Unlock() return sc.checkpointMap[id] } type stdoutSyner struct { ExternalEnvTag map[string]string ExternalK8sLabelTag map[string]string info *helper.DockerInfoDetail client *docker.Client startCheckPoint string lock sync.Mutex stdoutCheckPoint *StdoutCheckPoint context pipeline.Context runtimeContext context.Context cancelFun context.CancelFunc wg sync.WaitGroup beginLineReg *regexp.Regexp beginLineTimeout time.Duration beginLineCheckLength int maxLogSize int stdout bool stderr bool } func (ss *stdoutSyner) newContainerPump(c pipeline.Collector, stdout, stderr *io.PipeReader) { pump := func(source string, tags map[string]string, input io.Reader) { ss.wg.Add(1) defer ss.wg.Done() keys := []string{"_time_", "_source_", "content"} values := []string{"", source, ""} if ss.beginLineReg == nil { buf := bufio.NewReader(input) for { line, err := buf.ReadString('\n') if err != nil { if err != io.EOF && err != io.ErrClosedPipe { logger.Warning(ss.context.GetRuntimeContext(), "DOCKER_STDOUT_STOP_ALARM", "stdoutSyner done, id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status(), "source", source, "error", err) } logger.Debug(ss.context.GetRuntimeContext(), "docker source stop", source, "id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status(), "error", err) return } if index := strings.IndexByte(line, ' '); index > 1 && index < len(line)-1 { values[0] = line[0:index] values[2] = line[index+1 : len(line)-1] ss.lock.Lock() ss.startCheckPoint = values[0] //TODO: this is not correct, stdout and stderr should have separate checkpoint ss.lock.Unlock() } else { values[0] = "" values[2] = line } c.AddDataArray(tags, keys, values) } } else { buf := bufio.NewReader(util.NewTimeoutReader(input, ss.beginLineTimeout)) for { line, err := buf.ReadString('\n') if err == util.ErrReaderTimeout { // process timeout if len(values[2]) > 0 { logger.Debug(ss.context.GetRuntimeContext(), "log time out, treat as full log", util.CutString(values[2], 512)) c.AddDataArray(tags, keys, values) values[2] = line } else { values[2] += line } continue } if err != nil { if err != io.EOF && err != io.ErrClosedPipe { logger.Warning(ss.context.GetRuntimeContext(), "DOCKER_STDOUT_STOP_ALARM", "stdoutSyner done, id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status(), "source", source, "error", err) } logger.Debug(ss.context.GetRuntimeContext(), "docker source stop", source, "name", ss.info.ContainerInfo.Name, "error", err) if len(values[2]) > 0 { logger.Info(ss.context.GetRuntimeContext(), "flush out last line", util.CutString(values[2], 512)) c.AddDataArray(tags, keys, values) } return } // full line, check regex match result var logLine string if index := strings.IndexByte(line, ' '); index > 1 && index < len(line)-1 { values[0] = line[0:index] logLine = line[index+1 : len(line)-1] ss.lock.Lock() ss.startCheckPoint = values[0] //TODO: checkpoint before regex check, too early ss.lock.Unlock() } else { values[0] = "" logLine = line } var checkLine string if len(logLine) > ss.beginLineCheckLength { checkLine = logLine[0:ss.beginLineCheckLength] } else { checkLine = logLine } if matchRst := ss.beginLineReg.FindStringIndex(checkLine); len(matchRst) >= 2 && matchRst[0] == 0 && matchRst[1] == len(checkLine) { // match begin line if len(values[2]) > 0 { // new line c.AddDataArray(tags, keys, values) } values[2] = logLine } else { // not math begin line, wait for next line values[2] += "\n" values[2] += logLine // check very big line if len(values[2]) >= ss.maxLogSize { logger.Warning(ss.context.GetRuntimeContext(), "DOCKER_STDOUT_STOP_ALARM", "log line is too long, force flush out", len(values[2]), "log prefix", util.CutString(values[2], 4096)) c.AddDataArray(tags, keys, values) values[2] = "" } } } } } tags := ss.info.GetExternalTags(ss.ExternalEnvTag, ss.ExternalK8sLabelTag) for k, v := range ss.info.ContainerNameTag { tags[k] = v } if stdout != nil { go pump("stdout", tags, stdout) } if stderr != nil { go pump("stderr", tags, stderr) } } func (ss *stdoutSyner) Start(c pipeline.Collector) { if ss.beginLineCheckLength <= 0 { ss.beginLineCheckLength = 10 * 1024 } if ss.maxLogSize <= 0 { ss.maxLogSize = 512 * 1024 } ss.wg.Add(1) defer ss.wg.Done() for { var cpTime time.Time ss.lock.Lock() if len(ss.startCheckPoint) > 0 { var err error if cpTime, err = time.Parse(helper.DockerTimeFormat, ss.startCheckPoint); err != nil { logger.Warning(ss.context.GetRuntimeContext(), "CHECKPOINT_ALARM", "docker stdout raw parse start time error", ss.startCheckPoint, "id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status()) } else { logger.Info(ss.context.GetRuntimeContext(), "docker stdout raw recover since", ss.startCheckPoint, "id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status()) } } if cpTime.IsZero() { // if first start, skip 10 second cpTime = time.Now().Add(time.Second * time.Duration(-10)) logger.Info(ss.context.GetRuntimeContext(), "docker stdout raw first read since", cpTime.Format(helper.DockerTimeFormat), "id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status()) } ss.lock.Unlock() options := types.ContainerLogsOptions{ ShowStdout: ss.stdout, ShowStderr: ss.stderr, Since: cpTime.Format(helper.DockerTimeFormat), Timestamps: true, Follow: true, } var outrd, errrd *io.PipeReader var outwr, errwr *io.PipeWriter outrd, outwr = io.Pipe() if ss.info.ContainerInfo.Config.Tty { options.ShowStderr = false } if options.ShowStdout { outrd, outwr = io.Pipe() } if options.ShowStderr { errrd, errwr = io.Pipe() } logger.Info(ss.context.GetRuntimeContext(), "docker stdout raw", "begin", "id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status()) // start pump logs go routines ss.newContainerPump(c, outrd, errrd) // loop to copy logs to parser logReader, err := ss.client.ContainerLogs(ss.runtimeContext, ss.info.ContainerInfo.ID, options) if err != nil { logger.Errorf(ss.context.GetRuntimeContext(), "DOCKER_STDOUT_STOP_ALARM", "open container log error=%v, id:%v\tname:%v\tcreated:%v\tstatus:%v", err.Error(), ss.info.IDPrefix(), ss.info.ContainerInfo.Name, ss.info.ContainerInfo.Created, ss.info.Status()) break } var written int64 if ss.info.ContainerInfo.Config.Tty { written, err = io.Copy(outwr, logReader) logger.Debugf(ss.context.GetRuntimeContext(), "read container log bytes=%v, id:%v\tname:%v\tcreated:%v\tstatus:%v", written, ss.info.IDPrefix(), ss.info.ContainerInfo.Name, ss.info.ContainerInfo.Created, ss.info.Status()) if err != nil && err != context.Canceled { logger.Errorf(ss.context.GetRuntimeContext(), "DOCKER_STDOUT_STOP_ALARM", "read container log error=%v, id:%v\tname:%v\tcreated:%v\tstatus:%v", err.Error(), ss.info.IDPrefix(), ss.info.ContainerInfo.Name, ss.info.ContainerInfo.Created, ss.info.Status()) } } else { written, err = stdcopy.StdCopy(outwr, errwr, logReader) logger.Debugf(ss.context.GetRuntimeContext(), "read container log bytes=%v, id:%v\tname:%v\tcreated:%v\tstatus:%v", written, ss.info.IDPrefix(), ss.info.ContainerInfo.Name, ss.info.ContainerInfo.Created, ss.info.Status()) if err != nil && err != context.Canceled { logger.Errorf(ss.context.GetRuntimeContext(), "DOCKER_STDOUT_STOP_ALARM", "read container log error=%v, id:%v\tname:%v\tcreated:%v\tstatus:%v", err.Error(), ss.info.IDPrefix(), ss.info.ContainerInfo.Name, ss.info.ContainerInfo.Created, ss.info.Status()) } } // loop broken if container exits if closeErr := logReader.Close(); closeErr != nil { logger.Warningf(ss.context.GetRuntimeContext(), "DOCKER_STDOUT_STOP_ALARM", "close container log error=%v, id:%v\tname:%v\tcreated:%v\tstatus:%v", closeErr, ss.info.IDPrefix(), ss.info.ContainerInfo.Name, ss.info.ContainerInfo.Created, ss.info.Status()) } _ = outrd.CloseWithError(io.EOF) _ = errrd.CloseWithError(io.EOF) ss.lock.Lock() if len(ss.startCheckPoint) > 0 { ss.stdoutCheckPoint.SaveCheckPoint(ss.info.ContainerInfo.ID, ss.startCheckPoint) } ss.lock.Unlock() switch err { case nil, context.Canceled, context.DeadlineExceeded: logger.Info(ss.context.GetRuntimeContext(), "docker stdout raw", "stop", "id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status()) return default: // after sleep, we need recheck if runtime context is done logger.Warning(ss.context.GetRuntimeContext(), "DOCKER_STDOUT_STOP_ALARM", "stdoutSyner stop, retry after 10 seconds, id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status(), "error", err) if util.RandomSleep(time.Second*time.Duration(10), 0.1, ss.runtimeContext.Done()) { logger.Info(ss.context.GetRuntimeContext(), "docker stdout raw", "stop", "id", ss.info.IDPrefix(), "name", ss.info.ContainerInfo.Name, "created", ss.info.ContainerInfo.Created, "status", ss.info.Status()) return } } } } func (ss *stdoutSyner) Stop() { ss.cancelFun() ss.wg.Wait() } func (ss *stdoutSyner) Update() { } type ServiceDockerStdout struct { IncludeLabel map[string]string // Deprecated: use IncludeContainerLabel and IncludeK8sLabel instead. ExcludeLabel map[string]string // Deprecated: use ExcludeContainerLabel and ExcludeK8sLabel instead. IncludeEnv map[string]string ExcludeEnv map[string]string IncludeContainerLabel map[string]string ExcludeContainerLabel map[string]string IncludeK8sLabel map[string]string ExcludeK8sLabel map[string]string ExternalEnvTag map[string]string ExternalK8sLabelTag map[string]string FlushIntervalMs int TimeoutMs int BeginLineRegex string BeginLineTimeoutMs int BeginLineCheckLength int MaxLogSize int Stdout bool Stderr bool K8sNamespaceRegex string K8sPodRegex string K8sContainerRegex string // export from ilogtail-trace component IncludeLabelRegex map[string]*regexp.Regexp ExcludeLabelRegex map[string]*regexp.Regexp IncludeEnvRegex map[string]*regexp.Regexp ExcludeEnvRegex map[string]*regexp.Regexp K8sFilter *helper.K8SFilter synerMap map[string]*stdoutSyner client *docker.Client shutdown chan struct{} waitGroup sync.WaitGroup context pipeline.Context runtimeContext context.Context stdoutCheckPoint *StdoutCheckPoint } func (sds *ServiceDockerStdout) Init(context pipeline.Context) (int, error) { sds.context = context helper.ContainerCenterInit() sds.synerMap = make(map[string]*stdoutSyner) var err error sds.IncludeEnv, sds.IncludeEnvRegex, err = helper.SplitRegexFromMap(sds.IncludeEnv) if err != nil { logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init include env regex error", err) } sds.ExcludeEnv, sds.ExcludeEnvRegex, err = helper.SplitRegexFromMap(sds.ExcludeEnv) if err != nil { logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init exclude env regex error", err) } if sds.IncludeLabel != nil { for k, v := range sds.IncludeContainerLabel { sds.IncludeLabel[k] = v } } else { sds.IncludeLabel = sds.IncludeContainerLabel } if sds.ExcludeLabel != nil { for k, v := range sds.ExcludeContainerLabel { sds.ExcludeLabel[k] = v } } else { sds.ExcludeLabel = sds.ExcludeContainerLabel } sds.IncludeLabel, sds.IncludeLabelRegex, err = helper.SplitRegexFromMap(sds.IncludeLabel) if err != nil { logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init include label regex error", err) } sds.ExcludeLabel, sds.ExcludeLabelRegex, err = helper.SplitRegexFromMap(sds.ExcludeLabel) if err != nil { logger.Warning(sds.context.GetRuntimeContext(), "INVALID_REGEX_ALARM", "init exclude label regex error", err) } sds.K8sFilter, err = helper.CreateK8SFilter(sds.K8sNamespaceRegex, sds.K8sPodRegex, sds.K8sContainerRegex, sds.IncludeK8sLabel, sds.ExcludeK8sLabel) return 0, err } func (sds *ServiceDockerStdout) Description() string { return "docker stdout raw input plugin for logtail" } // Collect takes in an accumulator and adds the metrics that the Input // gathers. This is called every "interval" func (sds *ServiceDockerStdout) Collect(pipeline.Collector) error { return nil } func (sds *ServiceDockerStdout) FlushAll(c pipeline.Collector, firstStart bool) error { var err error dockerInfos := helper.GetContainerByAcceptedInfo( sds.IncludeLabel, sds.ExcludeLabel, sds.IncludeLabelRegex, sds.ExcludeLabelRegex, sds.IncludeEnv, sds.ExcludeEnv, sds.IncludeEnvRegex, sds.ExcludeEnvRegex, sds.K8sFilter) logger.Debug(sds.context.GetRuntimeContext(), "flush all", len(dockerInfos)) for id, info := range dockerInfos { if !logDriverSupported(info.ContainerInfo) { continue } if syner, ok := sds.synerMap[id]; !ok || firstStart { runtimeContext, cancelFun := context.WithCancel(sds.runtimeContext) var reg *regexp.Regexp if len(sds.BeginLineRegex) > 0 { if reg, err = regexp.Compile(sds.BeginLineRegex); err != nil { logger.Warning(sds.context.GetRuntimeContext(), "REGEX_COMPILE_ALARM", "compile begin line regex error, regex", sds.BeginLineRegex, "error", err) } } syner = &stdoutSyner{ info: info, client: sds.client, stdoutCheckPoint: sds.stdoutCheckPoint, startCheckPoint: sds.stdoutCheckPoint.GetCheckPoint(id), context: sds.context, runtimeContext: runtimeContext, cancelFun: cancelFun, beginLineReg: reg, beginLineCheckLength: sds.BeginLineCheckLength, beginLineTimeout: time.Duration(sds.BeginLineTimeoutMs) * time.Millisecond, maxLogSize: sds.MaxLogSize, stdout: sds.Stdout, stderr: sds.Stderr, ExternalEnvTag: sds.ExternalEnvTag, ExternalK8sLabelTag: sds.ExternalK8sLabelTag, } sds.synerMap[id] = syner go syner.Start(c) } else { syner.lock.Lock() if len(syner.startCheckPoint) > 0 { sds.stdoutCheckPoint.SaveCheckPoint(syner.info.ContainerInfo.ID, syner.startCheckPoint) } syner.lock.Unlock() } } // delete container for id, syner := range sds.synerMap { if _, ok := dockerInfos[id]; !ok { logger.Info(sds.context.GetRuntimeContext(), "delete docker stdout raw, id", id, "name", syner.info.ContainerInfo.Name) syner.Stop() delete(sds.synerMap, id) sds.stdoutCheckPoint.DeleteCheckPoint(id) } } return err } func (sds *ServiceDockerStdout) SaveCheckPoint() error { logger.Debug(sds.context.GetRuntimeContext(), "stdout raw save checkpoint", sds.stdoutCheckPoint.checkpointMap) return sds.context.SaveCheckPointObject("service_docker_stdout", sds.stdoutCheckPoint.checkpointMap) } func (sds *ServiceDockerStdout) GetCheckPoint() *StdoutCheckPoint { if sds.stdoutCheckPoint != nil { return sds.stdoutCheckPoint } stdoutCheckPoint := &StdoutCheckPoint{} sds.context.GetCheckPointObject("service_docker_stdout", &stdoutCheckPoint.checkpointMap) if stdoutCheckPoint.checkpointMap == nil { logger.Debug(sds.context.GetRuntimeContext(), "stdout raw get checkpoint", "failed") stdoutCheckPoint.checkpointMap = make(map[string]string) } else { logger.Debug(sds.context.GetRuntimeContext(), "stdout raw get checkpoint", stdoutCheckPoint.checkpointMap) } return stdoutCheckPoint } // Start starts the ServiceInput's service, whatever that may be func (sds *ServiceDockerStdout) Start(c pipeline.Collector) error { sds.shutdown = make(chan struct{}) sds.waitGroup.Add(1) defer sds.waitGroup.Done() sds.stdoutCheckPoint = sds.GetCheckPoint() var err error if sds.client, err = helper.CreateDockerClient(); err != nil { logger.Error(sds.context.GetRuntimeContext(), "DOCKER_CLIENT_ALARM", "create docker client error", err) return err } var cancelFun context.CancelFunc sds.runtimeContext, cancelFun = context.WithCancel(context.Background()) _ = sds.FlushAll(c, true) for { timer := time.NewTimer(time.Duration(sds.FlushIntervalMs) * time.Millisecond) select { case <-sds.shutdown: logger.Info(sds.context.GetRuntimeContext(), "docker stdout raw main runtime stop", "begin") for _, syner := range sds.synerMap { syner.Stop() } cancelFun() logger.Info(sds.context.GetRuntimeContext(), "docker stdout raw main runtime stop", "success") return nil case <-timer.C: _ = sds.SaveCheckPoint() _ = sds.FlushAll(c, false) } } } // Stop stops the services and closes any necessary channels and connections func (sds *ServiceDockerStdout) Stop() error { close(sds.shutdown) sds.waitGroup.Wait() _ = sds.SaveCheckPoint() return nil } func init() { pipeline.ServiceInputs["service_docker_stdout_raw"] = func() pipeline.ServiceInput { return &ServiceDockerStdout{ FlushIntervalMs: 3000, TimeoutMs: 3000, Stdout: true, Stderr: true, BeginLineTimeoutMs: 3000, BeginLineCheckLength: 10 * 1024, } } }