func()

in plugins/inputs/logfile/logfile.go [148:278]


func (t *LogFile) FindLogSrc() []logs.LogSrc {
	if !t.started {
		t.Log.Warn("not started with file state folder %s", t.FileStateFolder)
		return nil
	}

	var srcs []logs.LogSrc

	t.cleanUpStoppedTailerSrc()

	es := entitystore.GetEntityStore()

	// Create a "tailer" for each file
	for i := range t.FileConfig {
		fileconfig := &t.FileConfig[i]

		//Add file -> {serviceName,  deploymentEnvironment} mapping to entity store
		if es != nil {
			es.AddServiceAttrEntryForLogFile(entitystore.LogFileGlob(fileconfig.FilePath), fileconfig.ServiceName, fileconfig.Environment)
		}

		targetFiles, err := t.getTargetFiles(fileconfig)
		if err != nil {
			t.Log.Errorf("Failed to find target files for file config %v, with error: %v", fileconfig.FilePath, err)
		}
		for _, filename := range targetFiles {
			dests, ok := t.configs[fileconfig]
			if !ok {
				dests = make(map[string]*tailerSrc)
				t.configs[fileconfig] = dests
			}

			if _, ok := dests[filename]; ok {
				continue
			} else if fileconfig.AutoRemoval {
				// This logic means auto_removal does not work with publish_multi_logs
				for _, dst := range dests {
					// Stop all other tailers in favor of the newly found file
					dst.tailer.StopAtEOF()
				}
			}

			var seekFile *tail.SeekInfo
			offset, err := t.restoreState(filename)
			if err == nil { // Missing state file would be an error too
				seekFile = &tail.SeekInfo{Whence: io.SeekStart, Offset: offset}
			} else if !fileconfig.Pipe && !fileconfig.FromBeginning {
				seekFile = &tail.SeekInfo{Whence: io.SeekEnd, Offset: 0}
			}

			isutf16 := false
			if fileconfig.Encoding == "utf-16" || fileconfig.Encoding == "utf-16le" || fileconfig.Encoding == "UTF-16" || fileconfig.Encoding == "UTF-16LE" {
				isutf16 = true
			}

			tailer, err := tail.TailFile(filename,
				tail.Config{
					ReOpen:      false,
					Follow:      true,
					Location:    seekFile,
					MustExist:   true,
					Pipe:        fileconfig.Pipe,
					Poll:        true,
					MaxLineSize: fileconfig.MaxEventSize,
					IsUTF16:     isutf16,
				})

			if err != nil {
				t.Log.Errorf("Failed to tail file %v with error: %v", filename, err)
				continue
			}

			var mlCheck func(string) bool
			if fileconfig.MultiLineStartPattern != "" {
				mlCheck = fileconfig.isMultilineStart
			}

			groupName := fileconfig.LogGroupName
			streamName := fileconfig.LogStreamName

			// In case of multilog, the group and stream has to be generated here
			// since it is based on the actual file name
			if fileconfig.PublishMultiLogs {
				if groupName == "" {
					groupName = generateLogGroupName(filename)
				} else {
					streamName = generateLogStreamName(filename, fileconfig.LogStreamName)
				}
			}

			destination := fileconfig.Destination
			if destination == "" {
				destination = t.Destination
			}

			src := NewTailerSrc(
				groupName, streamName,
				t.Destination,
				t.getStateFilePath(filename),
				fileconfig.LogGroupClass,
				fileconfig.FilePath,
				tailer,
				fileconfig.AutoRemoval,
				mlCheck,
				fileconfig.Filters,
				fileconfig.timestampFromLogLine,
				fileconfig.Enc,
				fileconfig.MaxEventSize,
				fileconfig.TruncateSuffix,
				fileconfig.RetentionInDays,
				fileconfig.BackpressureMode,
			)

			src.AddCleanUpFn(func(ts *tailerSrc) func() {
				return func() {
					select {
					case <-t.done: // No clean up needed after input plugin is stopped
					case t.removeTailerSrcCh <- ts:
					}

				}
			}(src))

			srcs = append(srcs, src)

			dests[filename] = src
		}
	}

	return srcs
}