in plugins/inputs/logfile/tailersrc.go [202:277]
func (ts *tailerSrc) runTail() {
defer ts.cleanUp()
t := time.NewTicker(multilineWaitPeriod)
defer t.Stop()
var init string
var msgBuf bytes.Buffer
var cnt int
fo := &fileOffset{}
ignoreUntilNextEvent := false
for {
select {
case line, ok := <-ts.tailer.Lines:
if !ok {
ts.publishEvent(msgBuf, fo)
return
}
if line.Err != nil {
log.Printf("E! [logfile] Error tailing line in file %s, Error: %s\n", ts.tailer.Filename, line.Err)
continue
}
text := line.Text
if ts.enc != nil {
var err error
text, err = ts.enc.NewDecoder().String(text)
if err != nil {
log.Printf("E! [logfile] Cannot decode the log file content for %s: %v\n", ts.tailer.Filename, err)
continue
}
}
if ts.isMLStart == nil {
msgBuf.Reset()
msgBuf.WriteString(text)
fo.SetOffset(line.Offset)
init = ""
} else if ts.isMLStart(text) || (!ignoreUntilNextEvent && msgBuf.Len() == 0) {
init = text
ignoreUntilNextEvent = false
} else if ignoreUntilNextEvent || msgBuf.Len() >= ts.maxEventSize {
ignoreUntilNextEvent = true
fo.SetOffset(line.Offset)
continue
} else {
msgBuf.WriteString("\n")
msgBuf.WriteString(text)
if msgBuf.Len() > ts.maxEventSize {
msgBuf.Truncate(ts.maxEventSize - len(ts.truncateSuffix))
msgBuf.WriteString(ts.truncateSuffix)
}
fo.SetOffset(line.Offset)
continue
}
ts.publishEvent(msgBuf, fo)
msgBuf.Reset()
msgBuf.WriteString(init)
fo.SetOffset(line.Offset)
cnt = 0
case <-t.C:
if msgBuf.Len() > 0 {
cnt++
}
if cnt >= 5 {
ts.publishEvent(msgBuf, fo)
msgBuf.Reset()
cnt = 0
}
case <-ts.done:
return
}
}
}