func()

in logs/logs.go [84:141]


func (l *LogAgent) Run(ctx context.Context) {
	log.Printf("I! [logagent] starting")
	for _, output := range l.Config.Outputs {
		backend, ok := output.Output.(LogBackend)
		if !ok {
			continue
		}
		log.Printf("I! [logagent] found plugin %v is a log backend", output.Config.Name)
		name := output.Config.Alias
		if name == "" {
			name = output.Config.Name
		}
		l.backends[name] = backend
	}

	for _, input := range l.Config.Inputs {
		if collection, ok := input.Input.(LogCollection); ok {
			log.Printf("I! [logagent] found plugin %v is a log collection", input.Config.Name)
			err := collection.Start(nil)
			if err != nil {
				log.Printf("E! could not start log collection %v err %v", input.Config.Name, err)
			}
			l.collections = append(l.collections, collection)
		}
	}

	t := time.NewTicker(time.Second)
	defer t.Stop()
	for {
		select {
		case <-t.C:
			log.Printf("D! [logagent] open file count, %v", tail.OpenFileCount.Load())
			for _, c := range l.collections {
				srcs := c.FindLogSrc()
				for _, src := range srcs {
					dname := src.Destination()
					logGroup := src.Group()
					logStream := src.Stream()
					description := src.Description()
					retention := src.Retention()
					logGroupClass := src.Class()
					backend, ok := l.backends[dname]
					if !ok {
						log.Printf("E! [logagent] Failed to find destination %s for log source %s/%s(%s) ", dname, logGroup, logStream, description)
						continue
					}
					retention = l.checkRetentionAlreadyAttempted(retention, logGroup)
					dest := backend.CreateDest(logGroup, logStream, retention, logGroupClass, src)
					l.destNames[dest] = dname
					log.Printf("I! [logagent] piping log from %s/%s(%s) to %s with retention %d", logGroup, logStream, description, dname, retention)
					go l.runSrcToDest(src, dest)
				}
			}
		case <-ctx.Done():
			return
		}
	}
}