func()

in plugins/inputs/logfile/tailersrc.go [159:272]


func (ts *tailerSrc) runTail() {
	defer ts.cleanUp()
	t := time.NewTicker(multilineWaitPeriod)
	defer t.Stop()
	var init string
	var msgBuf bytes.Buffer
	var cnt int
	fo := &fileOffset{}

	ignoreUntilNextEvent := false
	for {

		select {
		case line, ok := <-ts.tailer.Lines:
			if !ok {
				if msgBuf.Len() > 0 {
					msg := msgBuf.String()
					e := &LogEvent{
						msg:    msg,
						t:      ts.timestampFn(msg),
						offset: *fo,
						src:    ts,
					}

					if ShouldPublish(ts.group, ts.stream, ts.filters, e) {
						ts.outputFn(e)
					}
				}
				return
			}

			if line.Err != nil {
				log.Printf("E! [logfile] Error tailing line in file %s, Error: %s\n", ts.tailer.Filename, line.Err)
				continue
			}

			text := line.Text
			if ts.enc != nil {
				var err error
				text, err = ts.enc.NewDecoder().String(text)
				if err != nil {
					log.Printf("E! [logfile] Cannot decode the log file content for %s: %v\n", ts.tailer.Filename, err)
					continue
				}
			}

			if ts.isMLStart == nil {
				msgBuf.Reset()
				msgBuf.WriteString(text)
				fo.SetOffset(line.Offset)
				init = ""
			} else if ts.isMLStart(text) || (!ignoreUntilNextEvent && msgBuf.Len() == 0) {
				init = text
				ignoreUntilNextEvent = false
			} else if ignoreUntilNextEvent || msgBuf.Len() >= ts.maxEventSize {
				ignoreUntilNextEvent = true
				fo.SetOffset(line.Offset)
				continue
			} else {
				msgBuf.WriteString("\n")
				msgBuf.WriteString(text)
				if msgBuf.Len() > ts.maxEventSize {
					msgBuf.Truncate(ts.maxEventSize - len(ts.truncateSuffix))
					msgBuf.WriteString(ts.truncateSuffix)
				}
				fo.SetOffset(line.Offset)
				continue
			}

			if msgBuf.Len() > 0 {
				msg := msgBuf.String()
				e := &LogEvent{
					msg:    msg,
					t:      ts.timestampFn(msg),
					offset: *fo,
					src:    ts,
				}
				// Note: This only checks against the truncated log message, so it is not necessary to load
				//       the entire log message for filtering.
				if ShouldPublish(ts.group, ts.stream, ts.filters, e) {
					ts.outputFn(e)
				}
			}

			msgBuf.Reset()
			msgBuf.WriteString(init)
			fo.SetOffset(line.Offset)
			cnt = 0
		case <-t.C:
			if msgBuf.Len() > 0 {
				cnt++
			}

			if cnt < 5 {
				continue
			}

			msg := msgBuf.String()
			e := &LogEvent{
				msg:    msg,
				t:      ts.timestampFn(msg),
				offset: *fo,
				src:    ts,
			}
			if ShouldPublish(ts.group, ts.stream, ts.filters, e) {
				ts.outputFn(e)
			}
			msgBuf.Reset()
			cnt = 0
		case <-ts.done:
			return
		}
	}
}