in logs/logs.go [84:141]
func (l *LogAgent) Run(ctx context.Context) {
log.Printf("I! [logagent] starting")
for _, output := range l.Config.Outputs {
backend, ok := output.Output.(LogBackend)
if !ok {
continue
}
log.Printf("I! [logagent] found plugin %v is a log backend", output.Config.Name)
name := output.Config.Alias
if name == "" {
name = output.Config.Name
}
l.backends[name] = backend
}
for _, input := range l.Config.Inputs {
if collection, ok := input.Input.(LogCollection); ok {
log.Printf("I! [logagent] found plugin %v is a log collection", input.Config.Name)
err := collection.Start(nil)
if err != nil {
log.Printf("E! could not start log collection %v err %v", input.Config.Name, err)
}
l.collections = append(l.collections, collection)
}
}
t := time.NewTicker(time.Second)
defer t.Stop()
for {
select {
case <-t.C:
log.Printf("D! [logagent] open file count, %v", tail.OpenFileCount.Load())
for _, c := range l.collections {
srcs := c.FindLogSrc()
for _, src := range srcs {
dname := src.Destination()
logGroup := src.Group()
logStream := src.Stream()
description := src.Description()
retention := src.Retention()
logGroupClass := src.Class()
backend, ok := l.backends[dname]
if !ok {
log.Printf("E! [logagent] Failed to find destination %s for log source %s/%s(%s) ", dname, logGroup, logStream, description)
continue
}
retention = l.checkRetentionAlreadyAttempted(retention, logGroup)
dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass, src)
l.destNames[dest] = dname
log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention)
go l.runSrcToDest(src, dest)
}
}
case <-ctx.Done():
return
}
}
}