plugins/inputs/logfile/tailersrc.go (360 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: MIT
package logfile
import (
"bytes"
"log"
"os"
"strconv"
"sync"
"time"
"golang.org/x/text/encoding"
"github.com/aws/amazon-cloudwatch-agent/extension/entitystore"
"github.com/aws/amazon-cloudwatch-agent/internal/logscommon"
"github.com/aws/amazon-cloudwatch-agent/logs"
"github.com/aws/amazon-cloudwatch-agent/plugins/inputs/logfile/tail"
"github.com/aws/amazon-cloudwatch-agent/sdk/service/cloudwatchlogs"
)
const (
stateFileMode = 0644
tailCloseThreshold = 3 * time.Second
)
var (
multilineWaitPeriod = 1 * time.Second
defaultBufferSize = 1
)
type fileOffset struct {
seq, offset int64 // Seq handles file trucation, when file is trucated, we increase the offset seq
}
func (fo *fileOffset) SetOffset(o int64) {
if o < fo.offset { // Increment the sequence number when a smaller offset is given (truncated)
fo.seq++
}
fo.offset = o
}
type LogEvent struct {
msg string
t time.Time
offset fileOffset
src *tailerSrc
}
func (le LogEvent) Message() string {
return le.msg
}
func (le LogEvent) Time() time.Time {
return le.t
}
func (le LogEvent) Done() {
le.src.Done(le.offset)
}
type tailerSrc struct {
group string
stream string
class string
fileGlobPath string
destination string
stateFilePath string
tailer *tail.Tail
autoRemoval bool
timestampFn func(string) (time.Time, string)
enc encoding.Encoding
maxEventSize int
truncateSuffix string
retentionInDays int
outputFn func(logs.LogEvent)
isMLStart func(string) bool
filters []*LogFilter
offsetCh chan fileOffset
done chan struct{}
startTailerOnce sync.Once
cleanUpFns []func()
backpressureFdDrop bool
buffer chan *LogEvent
stopOnce sync.Once
}
// Verify tailerSrc implements LogSrc
var _ logs.LogSrc = (*tailerSrc)(nil)
func NewTailerSrc(
group, stream, destination, stateFilePath, logClass, fileGlobPath string,
tailer *tail.Tail,
autoRemoval bool,
isMultilineStartFn func(string) bool,
filters []*LogFilter,
timestampFn func(string) (time.Time, string),
enc encoding.Encoding,
maxEventSize int,
truncateSuffix string,
retentionInDays int,
backpressureMode logscommon.BackpressureMode,
) *tailerSrc {
ts := &tailerSrc{
group: group,
stream: stream,
destination: destination,
stateFilePath: stateFilePath,
class: logClass,
fileGlobPath: fileGlobPath,
tailer: tailer,
autoRemoval: autoRemoval,
isMLStart: isMultilineStartFn,
filters: filters,
timestampFn: timestampFn,
enc: enc,
maxEventSize: maxEventSize,
truncateSuffix: truncateSuffix,
retentionInDays: retentionInDays,
backpressureFdDrop: !autoRemoval && backpressureMode == logscommon.LogBackpressureModeFDRelease,
offsetCh: make(chan fileOffset, 2000),
done: make(chan struct{}),
}
if ts.backpressureFdDrop {
ts.buffer = make(chan *LogEvent, defaultBufferSize)
}
go ts.runSaveState()
return ts
}
func (ts *tailerSrc) SetOutput(fn func(logs.LogEvent)) {
if fn == nil {
return
}
ts.outputFn = fn
ts.startTailerOnce.Do(func() {
go ts.runTail()
if ts.backpressureFdDrop {
go ts.runSender()
}
})
}
func (ts *tailerSrc) Group() string {
return ts.group
}
func (ts *tailerSrc) Stream() string {
return ts.stream
}
func (ts *tailerSrc) Description() string {
return ts.tailer.Filename
}
func (ts *tailerSrc) Destination() string {
return ts.destination
}
func (ts *tailerSrc) Retention() int {
return ts.retentionInDays
}
func (ts *tailerSrc) Class() string {
return ts.class
}
func (ts *tailerSrc) Done(offset fileOffset) {
// ts.offsetCh will only be blocked when the runSaveState func has exited,
// which only happens when the original file has been removed, thus making
// Keeping its offset useless
select {
case ts.offsetCh <- offset:
default:
}
}
func (ts *tailerSrc) Stop() {
ts.stopOnce.Do(func() {
close(ts.done)
if ts.buffer != nil {
close(ts.buffer)
}
})
}
func (ts *tailerSrc) AddCleanUpFn(f func()) {
ts.cleanUpFns = append(ts.cleanUpFns, f)
}
func (ts *tailerSrc) Entity() *cloudwatchlogs.Entity {
es := entitystore.GetEntityStore()
if es != nil {
return es.CreateLogFileEntity(entitystore.LogFileGlob(ts.fileGlobPath), entitystore.LogGroupName(ts.group))
}
return nil
}
func (ts *tailerSrc) runTail() {
defer ts.cleanUp()
t := time.NewTicker(multilineWaitPeriod)
defer t.Stop()
var init string
var msgBuf bytes.Buffer
var cnt int
fo := &fileOffset{}
ignoreUntilNextEvent := false
for {
select {
case line, ok := <-ts.tailer.Lines:
if !ok {
ts.publishEvent(msgBuf, fo)
return
}
if line.Err != nil {
log.Printf("E! [logfile] Error tailing line in file %s, Error: %s\n", ts.tailer.Filename, line.Err)
continue
}
text := line.Text
if ts.enc != nil {
var err error
text, err = ts.enc.NewDecoder().String(text)
if err != nil {
log.Printf("E! [logfile] Cannot decode the log file content for %s: %v\n", ts.tailer.Filename, err)
continue
}
}
if ts.isMLStart == nil {
msgBuf.Reset()
msgBuf.WriteString(text)
fo.SetOffset(line.Offset)
init = ""
} else if ts.isMLStart(text) || (!ignoreUntilNextEvent && msgBuf.Len() == 0) {
init = text
ignoreUntilNextEvent = false
} else if ignoreUntilNextEvent || msgBuf.Len() >= ts.maxEventSize {
ignoreUntilNextEvent = true
fo.SetOffset(line.Offset)
continue
} else {
msgBuf.WriteString("\n")
msgBuf.WriteString(text)
if msgBuf.Len() > ts.maxEventSize {
msgBuf.Truncate(ts.maxEventSize - len(ts.truncateSuffix))
msgBuf.WriteString(ts.truncateSuffix)
}
fo.SetOffset(line.Offset)
continue
}
ts.publishEvent(msgBuf, fo)
msgBuf.Reset()
msgBuf.WriteString(init)
fo.SetOffset(line.Offset)
cnt = 0
case <-t.C:
if msgBuf.Len() > 0 {
cnt++
}
if cnt >= 5 {
ts.publishEvent(msgBuf, fo)
msgBuf.Reset()
cnt = 0
}
case <-ts.done:
return
}
}
}
func (ts *tailerSrc) publishEvent(msgBuf bytes.Buffer, fo *fileOffset) {
// helper to handle event publishing
if msgBuf.Len() == 0 {
return
}
msg := msgBuf.String()
timestamp, modifiedMsg := ts.timestampFn(msg)
e := &LogEvent{
msg: modifiedMsg,
t: timestamp,
offset: *fo,
src: ts,
}
if ShouldPublish(ts.group, ts.stream, ts.filters, e) {
if ts.backpressureFdDrop {
select {
case ts.buffer <- e:
// successfully sent
case <-ts.done:
return
default:
// sender buffer is full. start timer to close file then retry
timer := time.NewTimer(tailCloseThreshold)
defer timer.Stop()
for {
select {
case ts.buffer <- e:
// sent event after buffer gets freed up
if ts.tailer.IsFileClosed() { // skip file closing if not already closed
if err := ts.tailer.Reopen(false); err != nil {
log.Printf("E! [logfile] error reopening file %s: %v", ts.tailer.Filename, err)
}
}
return
case <-timer.C:
// timer expired without successful send, close file
log.Printf("D! [logfile] tailer sender buffer blocked after retrying, closing file %v", ts.tailer.Filename)
ts.tailer.CloseFile()
case <-ts.done:
return
}
}
}
} else {
ts.outputFn(e)
}
}
}
func (ts *tailerSrc) runSender() {
log.Printf("D! [logfile] runSender starting for %s", ts.tailer.Filename)
for {
select {
case e, ok := <-ts.buffer:
if !ok { // buffer was closed
log.Printf("D! [logfile] runSender buffer was closed for %s", ts.tailer.Filename)
return
}
// Check done before sending
select {
case <-ts.done:
return
default:
if e != nil {
ts.outputFn(e)
}
}
case <-ts.done:
log.Printf("D! [logfile] runSender received done signal for %s", ts.tailer.Filename)
return
}
}
}
func (ts *tailerSrc) cleanUp() {
if ts.autoRemoval {
if err := os.Remove(ts.tailer.Filename); err != nil {
log.Printf("W! [logfile] Failed to auto remove file %v: %v", ts.tailer.Filename, err)
} else {
log.Printf("I! [logfile] Successfully removed file %v with auto_removal feature", ts.tailer.Filename)
}
}
for _, clf := range ts.cleanUpFns {
clf()
}
if ts.outputFn != nil {
ts.outputFn(nil) // inform logs agent the tailer src's exit, to stop runSrcToDest
}
}
func (ts *tailerSrc) runSaveState() {
t := time.NewTicker(100 * time.Millisecond)
defer t.Stop()
var offset, lastSavedOffset fileOffset
for {
select {
case o := <-ts.offsetCh:
if o.seq > offset.seq || (o.seq == offset.seq && o.offset > offset.offset) {
offset = o
}
case <-t.C:
if offset == lastSavedOffset {
continue
}
err := ts.saveState(offset.offset)
if err != nil {
log.Printf("E! [logfile] Error happened when saving file state %s to file state folder %s: %v", ts.tailer.Filename, ts.stateFilePath, err)
continue
}
lastSavedOffset = offset
case <-ts.tailer.FileDeletedCh:
log.Printf("W! [logfile] deleting state file %s", ts.stateFilePath)
err := os.Remove(ts.stateFilePath)
if err != nil {
log.Printf("W! [logfile] Error happened while deleting state file %s on cleanup: %v", ts.stateFilePath, err)
}
return
case <-ts.done:
err := ts.saveState(offset.offset)
if err != nil {
log.Printf("E! [logfile] Error happened during final file state saving of logfile %s to file state folder %s, duplicate log maybe sent at next start: %v", ts.tailer.Filename, ts.stateFilePath, err)
}
return
}
}
}
func (ts *tailerSrc) saveState(offset int64) error {
if ts.stateFilePath == "" || offset == 0 {
return nil
}
content := []byte(strconv.FormatInt(offset, 10) + "\n" + ts.tailer.Filename)
return os.WriteFile(ts.stateFilePath, content, stateFileMode)
}