in plugins/inputs/logfile/logfile.go [148:278]
func (t *LogFile) FindLogSrc() []logs.LogSrc {
if !t.started {
t.Log.Warn("not started with file state folder %s", t.FileStateFolder)
return nil
}
var srcs []logs.LogSrc
t.cleanUpStoppedTailerSrc()
es := entitystore.GetEntityStore()
// Create a "tailer" for each file
for i := range t.FileConfig {
fileconfig := &t.FileConfig[i]
//Add file -> {serviceName, deploymentEnvironment} mapping to entity store
if es != nil {
es.AddServiceAttrEntryForLogFile(entitystore.LogFileGlob(fileconfig.FilePath), fileconfig.ServiceName, fileconfig.Environment)
}
targetFiles, err := t.getTargetFiles(fileconfig)
if err != nil {
t.Log.Errorf("Failed to find target files for file config %v, with error: %v", fileconfig.FilePath, err)
}
for _, filename := range targetFiles {
dests, ok := t.configs[fileconfig]
if !ok {
dests = make(map[string]*tailerSrc)
t.configs[fileconfig] = dests
}
if _, ok := dests[filename]; ok {
continue
} else if fileconfig.AutoRemoval {
// This logic means auto_removal does not work with publish_multi_logs
for _, dst := range dests {
// Stop all other tailers in favor of the newly found file
dst.tailer.StopAtEOF()
}
}
var seekFile *tail.SeekInfo
offset, err := t.restoreState(filename)
if err == nil { // Missing state file would be an error too
seekFile = &tail.SeekInfo{Whence: io.SeekStart, Offset: offset}
} else if !fileconfig.Pipe && !fileconfig.FromBeginning {
seekFile = &tail.SeekInfo{Whence: io.SeekEnd, Offset: 0}
}
isutf16 := false
if fileconfig.Encoding == "utf-16" || fileconfig.Encoding == "utf-16le" || fileconfig.Encoding == "UTF-16" || fileconfig.Encoding == "UTF-16LE" {
isutf16 = true
}
tailer, err := tail.TailFile(filename,
tail.Config{
ReOpen: false,
Follow: true,
Location: seekFile,
MustExist: true,
Pipe: fileconfig.Pipe,
Poll: true,
MaxLineSize: fileconfig.MaxEventSize,
IsUTF16: isutf16,
})
if err != nil {
t.Log.Errorf("Failed to tail file %v with error: %v", filename, err)
continue
}
var mlCheck func(string) bool
if fileconfig.MultiLineStartPattern != "" {
mlCheck = fileconfig.isMultilineStart
}
groupName := fileconfig.LogGroupName
streamName := fileconfig.LogStreamName
// In case of multilog, the group and stream has to be generated here
// since it is based on the actual file name
if fileconfig.PublishMultiLogs {
if groupName == "" {
groupName = generateLogGroupName(filename)
} else {
streamName = generateLogStreamName(filename, fileconfig.LogStreamName)
}
}
destination := fileconfig.Destination
if destination == "" {
destination = t.Destination
}
src := NewTailerSrc(
groupName, streamName,
t.Destination,
t.getStateFilePath(filename),
fileconfig.LogGroupClass,
fileconfig.FilePath,
tailer,
fileconfig.AutoRemoval,
mlCheck,
fileconfig.Filters,
fileconfig.timestampFromLogLine,
fileconfig.Enc,
fileconfig.MaxEventSize,
fileconfig.TruncateSuffix,
fileconfig.RetentionInDays,
fileconfig.BackpressureMode,
)
src.AddCleanUpFn(func(ts *tailerSrc) func() {
return func() {
select {
case <-t.done: // No clean up needed after input plugin is stopped
case t.removeTailerSrcCh <- ts:
}
}
}(src))
srcs = append(srcs, src)
dests[filename] = src
}
}
return srcs
}