pkg/helper/log_file_reader.go (369 lines of code) (raw):
// Copyright 2021 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package helper
import (
"context"
"io"
"os"
"sync"
"time"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/selfmonitor"
"github.com/alibaba/ilogtail/pkg/util"
)
type ReaderMetricTracker struct {
OpenCounter selfmonitor.CounterMetric
CloseCounter selfmonitor.CounterMetric
FileSizeCounter selfmonitor.CounterMetric
FileRotatorCounter selfmonitor.CounterMetric
ReadCounter selfmonitor.CounterMetric
ReadSizeCounter selfmonitor.CounterMetric
ProcessLatency selfmonitor.LatencyMetric
}
func NewReaderMetricTracker(mr *selfmonitor.MetricsRecord) *ReaderMetricTracker {
return &ReaderMetricTracker{
OpenCounter: selfmonitor.NewCounterMetricAndRegister(mr, "open_count"),
CloseCounter: selfmonitor.NewCounterMetricAndRegister(mr, "close_count"),
FileSizeCounter: selfmonitor.NewCounterMetricAndRegister(mr, "file_size"),
FileRotatorCounter: selfmonitor.NewCounterMetricAndRegister(mr, "file_rotate"),
ReadCounter: selfmonitor.NewCounterMetricAndRegister(mr, "read_count"),
ReadSizeCounter: selfmonitor.NewCounterMetricAndRegister(mr, "read_size"),
ProcessLatency: selfmonitor.NewLatencyMetricAndRegister(mr, "log_process_latency"),
}
}
type LogFileReaderConfig struct {
ReadIntervalMs int
MaxReadBlockSize int
CloseFileSec int
Tracker *ReaderMetricTracker
}
var DefaultLogFileReaderConfig = LogFileReaderConfig{
ReadIntervalMs: 1000,
MaxReadBlockSize: 512 * 1024,
CloseFileSec: 60,
Tracker: nil,
}
type LogFileReaderCheckPoint struct {
Path string
Offset int64
State StateOS
}
// IsSame check if the checkpoints is same
func (checkpoint *LogFileReaderCheckPoint) IsSame(other *LogFileReaderCheckPoint) bool {
if checkpoint.Path != other.Path || checkpoint.Offset != other.Offset {
return false
}
return !checkpoint.State.IsChange(other.State)
}
type LogFileReader struct {
Config LogFileReaderConfig
file *os.File
lastCheckpoint LogFileReaderCheckPoint
checkpoint LogFileReaderCheckPoint
processor LogFileProcessor
nowBlock []byte
lastBufferSize int
lastBufferTime time.Time
readWhenStart bool
checkpointLock sync.Mutex
shutdown chan struct{}
logContext context.Context
waitgroup sync.WaitGroup
foundFile bool
}
func NewLogFileReader(context context.Context, checkpoint LogFileReaderCheckPoint, config LogFileReaderConfig, processor LogFileProcessor) (*LogFileReader, error) {
readWhenStart := false
foundFile := true
if checkpoint.State.IsEmpty() {
if newStat, err := os.Stat(checkpoint.Path); err == nil {
checkpoint.State = GetOSState(newStat)
} else {
if os.IsNotExist(err) {
foundFile = false
}
logger.Warning(context, "STAT_FILE_ALARM", "stat file error when create reader, file", checkpoint.Path, "error", err.Error())
}
}
if !checkpoint.State.IsEmpty() {
if deltaNano := time.Now().UnixNano() - int64(checkpoint.State.ModifyTime); deltaNano >= 0 && deltaNano < 180*1e9 {
readWhenStart = true
logger.Info(context, "read file", checkpoint.Path, "first read", readWhenStart)
} else {
logger.Info(context, "read file", checkpoint.Path, "since offset", checkpoint.Offset)
}
}
return &LogFileReader{
checkpoint: checkpoint,
Config: config,
processor: processor,
logContext: context,
lastBufferSize: 0,
nowBlock: nil,
readWhenStart: readWhenStart,
foundFile: foundFile,
}, nil
}
// GetLastEndOfLine return new read bytes end with '\n'
// @note will return n + r.lastBufferSize when n + r.lastBufferSize == len(r.nowBlock)
func (r *LogFileReader) GetLastEndOfLine(n int) int {
blockSize := n + r.lastBufferSize
// if block size >= r.Config.MaxReadBlockSize, return n
if blockSize == len(r.nowBlock) {
return n
}
for i := blockSize - 1; i >= r.lastBufferSize; i-- {
if r.nowBlock[i] == '\n' {
return i - r.lastBufferSize + 1
}
}
return 0
}
func (r *LogFileReader) CheckFileChange() bool {
switch newStat, err := os.Stat(r.checkpoint.Path); {
case err == nil: // stat by filename to check if size changed or file changed
newOsStat := GetOSState(newStat)
// logger.Debug("check file change", newOsStat.String())
if r.checkpoint.State.IsChange(newOsStat) {
needResetOffset := false
if r.checkpoint.State.IsFileChange(newOsStat) {
needResetOffset = true
logger.Info(r.logContext, "file dev inode changed, read to end and force read from beginning, file", r.checkpoint.Path, "old", r.checkpoint.State.String(), "new", newOsStat.String(), "offset", r.checkpoint.Offset)
// read to end or shutdown
if r.file != nil {
r.ReadAndProcess(false)
r.CloseFile("open file and dev inode changed")
}
if r.Config.Tracker != nil {
r.Config.Tracker.FileRotatorCounter.Add(1)
}
// if file change, force flush last buffer
if r.lastBufferSize > 0 {
processSize := r.processor.Process(r.nowBlock[0:r.lastBufferSize], time.Hour)
r.UpdateProcessResult(0, processSize)
}
}
r.checkpointLock.Lock()
r.checkpoint.State = newOsStat
if needResetOffset {
r.checkpoint.Offset = 0
}
r.checkpointLock.Unlock()
return true
}
r.foundFile = true
case r.file != nil: // Fallback to stat by file handle to check if size changed. This is necessary because the file path may become inaccessible in certain scenarios, such as when a container is stopped, but the file handle remains valid.
if newStat, statErr := r.file.Stat(); statErr == nil {
newOsStat := GetOSState(newStat)
// logger.Debug("check file change", newOsStat.String())
if r.checkpoint.State.IsChange(newOsStat) {
r.checkpointLock.Lock()
r.checkpoint.State = newOsStat
r.checkpointLock.Unlock()
return true
}
}
default:
if os.IsNotExist(err) {
if r.foundFile {
logger.Warning(r.logContext, "STAT_FILE_ALARM", "stat file error, file", r.checkpoint.Path, "error", err.Error())
r.foundFile = false
}
} else {
logger.Warning(r.logContext, "STAT_FILE_ALARM", "stat file error, file", r.checkpoint.Path, "error", err.Error())
}
}
return false
}
func (r *LogFileReader) GetProcessor() LogFileProcessor {
return r.processor
}
func (r *LogFileReader) GetCheckpoint() (checkpoint LogFileReaderCheckPoint, updateFlag bool) {
r.checkpointLock.Lock()
defer func() {
r.lastCheckpoint = r.checkpoint
r.checkpointLock.Unlock()
}()
r.lastCheckpoint = r.checkpoint
return r.checkpoint, r.lastCheckpoint.IsSame(&r.checkpoint)
}
func (r *LogFileReader) UpdateProcessResult(readN, processedN int) {
if readN+r.lastBufferSize == processedN {
r.lastBufferSize = 0
r.lastBufferTime = time.Now()
} else {
if processedN != 0 {
// need move buffer
copy(r.nowBlock, r.nowBlock[processedN:readN+r.lastBufferSize])
r.lastBufferTime = time.Now()
}
r.lastBufferSize = readN + r.lastBufferSize - processedN
}
r.checkpointLock.Lock()
defer r.checkpointLock.Unlock()
r.checkpoint.Offset += int64(processedN)
}
func (r *LogFileReader) ProcessAfterRead(n int) {
// if no more file, check and process last buffer
if n == 0 {
if r.lastBufferSize > 0 {
// logger.Debug("no more file, check and process last buffer", string(r.nowBlock[0:r.lastBufferSize]))
processSize := r.processor.Process(r.nowBlock[0:r.lastBufferSize], time.Since(r.lastBufferTime))
if processSize > n+r.lastBufferSize {
processSize = n + r.lastBufferSize
}
r.UpdateProcessResult(n, processSize)
} else {
// if no data, just call process and give a empty []byte array
r.processor.Process(r.nowBlock[0:0], time.Since(r.lastBufferTime))
}
} else {
processSize := r.processor.Process(r.nowBlock[0:r.lastBufferSize+n], time.Duration(0))
if processSize > n+r.lastBufferSize {
processSize = n + r.lastBufferSize
}
// logger.Debug("process file, len", r.lastBufferSize+n, "processed size", processSize)
r.UpdateProcessResult(n, processSize)
}
}
func (r *LogFileReader) ReadOpen() error {
if r.file == nil {
var err error
r.file, err = ReadOpen(r.checkpoint.Path)
if r.Config.Tracker != nil {
r.Config.Tracker.OpenCounter.Add(1)
}
logger.Debug(r.logContext, "open file for read, file", r.checkpoint.Path, "offset", r.checkpoint.Offset, "status", r.checkpoint.State)
return err
}
return nil
}
func (r *LogFileReader) ReadAndProcess(once bool) {
if r.nowBlock == nil {
// once only be true when shutdown, and we don't need to init r.nowBlock when shutdown because this file is never readed
if once {
return
}
// lazy init
r.nowBlock = make([]byte, r.Config.MaxReadBlockSize)
}
if err := r.ReadOpen(); err == nil {
file := r.file
// double check
if newStat, statErr := file.Stat(); statErr == nil {
newOsStat := GetOSState(newStat)
// logger.Debug("check file dev inode, file", r.checkpoint.Path, "old", r.checkpoint.State.String(), "new", newOsStat.String(), "offset", r.checkpoint.Offset)
// check file dev+inode changed
if r.checkpoint.State.IsFileChange(newOsStat) {
logger.Info(r.logContext, "file dev inode changed, force read from beginning, file", r.checkpoint.Path, "old", r.checkpoint.State.String(), "new", newOsStat.String(), "offset", r.checkpoint.Offset)
// if file dev+inode changed, force flush last buffer
if r.lastBufferSize > 0 {
processSize := r.processor.Process(r.nowBlock[0:r.lastBufferSize], time.Hour)
r.UpdateProcessResult(0, processSize)
}
if r.Config.Tracker != nil {
r.Config.Tracker.FileRotatorCounter.Add(1)
}
r.checkpointLock.Lock()
r.checkpoint.Offset = 0
r.checkpoint.State = newOsStat
r.checkpointLock.Unlock()
r.CloseFile("file changed(rotate)")
return
}
// check file truncated
if newOsStat.Size < r.checkpoint.Offset {
logger.Info(r.logContext, "file truncated, force read from beginning, file", r.checkpoint.Path, "old", r.checkpoint.State.String(), "new", newOsStat.String(), "offset", r.checkpoint.Offset)
// if file dev+inode changed, force flush last buffer
if r.lastBufferSize > 0 {
processSize := r.processor.Process(r.nowBlock[0:r.lastBufferSize], time.Hour)
r.UpdateProcessResult(0, processSize)
}
if r.Config.Tracker != nil {
r.Config.Tracker.FileRotatorCounter.Add(1)
}
r.checkpointLock.Lock()
if newOsStat.Size < 10*1024*1024 {
r.checkpoint.Offset = 0
} else {
r.checkpoint.Offset = newOsStat.Size - 1024*1024
}
r.checkpoint.State = newOsStat
r.checkpointLock.Unlock()
r.CloseFile("file changed(truncate)")
return
}
} else {
logger.Warning(r.logContext, "STAT_FILE_ALARM", "stat file error, file", r.checkpoint.Path, "error", statErr.Error())
}
for {
n, readErr := file.ReadAt(r.nowBlock[r.lastBufferSize:], int64(r.lastBufferSize)+r.checkpoint.Offset)
needBreak := false
if r.Config.Tracker != nil {
r.Config.Tracker.ReadCounter.Add(1)
r.Config.Tracker.ReadSizeCounter.Add(int64(n))
}
if once || n < r.Config.MaxReadBlockSize-r.lastBufferSize {
needBreak = true
}
if readErr != nil {
if readErr != io.EOF {
logger.Warning(r.logContext, "READ_FILE_ALARM", "read file error, file", r.checkpoint.Path, "error", readErr.Error())
break
}
logger.Debug(r.logContext, "read end of file", r.checkpoint.Path, "offset", r.checkpoint.Offset, "last buffer size", r.lastBufferSize, "read n", n, "stat", r.checkpoint.State.String())
needBreak = true
}
// only accept buffer end of '\n'
n = r.GetLastEndOfLine(n)
// logger.Debug("after check last end of line, n", n)
r.ProcessAfterRead(n)
if !needBreak {
// check shutdown
select {
case <-r.shutdown:
logger.Info(r.logContext, "receive stop signal when read data, path", r.checkpoint.Path)
needBreak = true
default:
}
}
if needBreak {
break
}
}
} else {
logger.Warning(r.logContext, "READ_FILE_ALARM", "open file for read error, file", r.checkpoint.Path, "error", err.Error())
}
}
func (r *LogFileReader) CloseFile(reason string) {
if r.file != nil {
_ = r.file.Close()
r.file = nil
if r.Config.Tracker != nil {
r.Config.Tracker.CloseCounter.Add(1)
}
logger.Debug(r.logContext, "close file, reason", reason, "file", r.checkpoint.Path, "offset", r.checkpoint.Offset, "status", r.checkpoint.State)
}
}
func (r *LogFileReader) Run() {
defer func() {
r.CloseFile("run done")
r.waitgroup.Done()
panicRecover(r.logContext, r.checkpoint.Path)
}()
lastReadTime := time.Now()
tracker := r.Config.Tracker
for {
startProcessTime := time.Now()
if r.readWhenStart || r.CheckFileChange() {
r.readWhenStart = false
r.ReadAndProcess(false)
lastReadTime = startProcessTime
} else {
r.ProcessAfterRead(0)
if startProcessTime.Sub(lastReadTime) > time.Second*time.Duration(r.Config.CloseFileSec) {
r.CloseFile("no read timeout")
lastReadTime = startProcessTime
}
}
if tracker != nil {
tracker.ProcessLatency.Observe(float64(time.Since(startProcessTime)))
}
endProcessTime := time.Now()
sleepDuration := time.Millisecond*time.Duration(r.Config.ReadIntervalMs) - endProcessTime.Sub(startProcessTime)
// logger.Debug(r.logContext, "sleep duration", sleepDuration, "normal", r.Config.ReadIntervalMs, "path", r.checkpoint.Path)
if util.RandomSleep(sleepDuration, 0.1, r.shutdown) {
r.ReadAndProcess(true)
if r.lastBufferSize > 0 {
processSize := r.processor.Process(r.nowBlock[0:r.lastBufferSize], time.Hour)
r.UpdateProcessResult(0, processSize)
}
// [to #37527076] force flush last log buffer in processor
r.processor.Process(r.nowBlock[0:0], time.Hour)
break
}
}
}
// SetForceRead force read file when reader start
func (r *LogFileReader) SetForceRead() {
r.readWhenStart = true
}
func (r *LogFileReader) Start() {
r.shutdown = make(chan struct{})
r.waitgroup.Add(1)
go r.Run()
}
func (r *LogFileReader) Stop() {
close(r.shutdown)
r.waitgroup.Wait()
}