collector/logs/sources/tail/tail.go (197 lines of code) (raw):

package tail import ( "context" "encoding/json" "fmt" "os" "path/filepath" "strings" "sync" "github.com/Azure/adx-mon/collector/logs/engine" "github.com/Azure/adx-mon/collector/logs/sources/tail/sourceparse" "github.com/Azure/adx-mon/collector/logs/types" "github.com/Azure/adx-mon/pkg/logger" ) const ( cursor_position = "adxmon_cursor_position" cursor_file_id = "adxmon_cursor_file_id" cursor_file_name = "adxmon_cursor_file_name" ) var ( noopAck = func() {} noopAckGenerator = func(*types.Log) func() { return noopAck } ) // FileTailTarget describes a file to tail, how to parse it, and the destination for the parsed logs. // It is used in the list of StaticTargets in TailSourceConfig, or used in AddTarget. // NOTE: If new fields are added that change during runtime (e.g. from pod metadata changing) that are sent // via UpdateChan, update isTargetChanged to take this new field into account. type FileTailTarget struct { // FilePath is the file to tail. FilePath string // LogType is the format of the log file. e.g. docker for logs written by the docker json driver. // This provides a mechanism to combine split lines, parse timestamps (if present), and extract a message field. // Defaults to plain. LogType sourceparse.Type // The destination database name. Populated into the databasename attribute of the log that can be overwritten by transforms. Database string // The destination table name. Populated into the tablename attribute of the log that can be overwritten by transforms. Table string // Parsers is a list of parsers names to apply to each line of the log file to attempt to extract fields from the message body. // These are run sequentially until one succeeds, or until all have been tried. // These are converted into parser.ParserType. Parsers []string // Resources is a map of additional resource k/v pairs to add to each log. Resources map[string]interface{} } // TailSourceConfig configures TailSource. type TailSourceConfig struct { StaticTargets []FileTailTarget CursorDirectory string WorkerCreator engine.WorkerCreatorFunc // TODO mkeesey - TailSource should not need to manage poddiscovery service lifecycle. // However, creation of TailSource happens with a create method to access store, making this // wiring difficult. PodDiscoveryOpts *PodDiscoveryOpts } // TailSource implements the types.Source interface for tailing files. type TailSource struct { staticTargets []FileTailTarget cursorDirectory string workerCreator engine.WorkerCreatorFunc ackGenerator func(*types.Log) func() tailers map[string]*Tailer // protects tailers mu sync.RWMutex podDiscovery *PodDiscovery } func NewTailSource(config TailSourceConfig) (*TailSource, error) { ts := &TailSource{ staticTargets: config.StaticTargets, cursorDirectory: config.CursorDirectory, workerCreator: config.WorkerCreator, } if config.PodDiscoveryOpts != nil { ts.podDiscovery = NewPodDiscovery(*config.PodDiscoveryOpts, ts) } return ts, nil } func (s *TailSource) Open(ctx context.Context) error { s.ackGenerator = noopAckGenerator if s.cursorDirectory != "" { s.ackGenerator = func(log *types.Log) func() { cursorFileName := types.StringOrEmpty(log.GetAttributeValue(cursor_file_name)) cursorFileId := types.StringOrEmpty(log.GetAttributeValue(cursor_file_id)) cursorPositionVal, ok := log.GetAttributeValue(cursor_position) if !ok || cursorFileName == "" || cursorFileId == "" { return noopAck } cursorPosition, ok := cursorPositionVal.(int64) if !ok { return noopAck } return func() { s.ackBatch(cursorFileName, cursorFileId, cursorPosition) } } } s.tailers = map[string]*Tailer{} for _, target := range s.staticTargets { target := target err := s.AddTarget(target, nil) if err != nil { // On startup, if we fail to add a target, we should close all the tailers we've opened so far and return. s.Close() return fmt.Errorf("TailSource open: %w", err) } } if s.podDiscovery != nil { err := s.podDiscovery.Open(ctx) if err != nil { // On startup, if we fail to open the pod discovery, we should close all the tailers we've opened so far and return. s.Close() return fmt.Errorf("TailSource open: %w", err) } } return nil } func (s *TailSource) Close() error { if s.podDiscovery != nil { s.podDiscovery.Close() } s.mu.Lock() for _, t := range s.tailers { t.Stop() } for _, t := range s.tailers { t.Wait() } clear(s.tailers) s.mu.Unlock() return nil } func (s *TailSource) Name() string { return "tailsource" } // AddTarget adds a new file to tail. // updateChan is an optional channel to provide updated FileTailTarget metadata during runtime. // Does not support updating FilePath or LogType. func (s *TailSource) AddTarget(target FileTailTarget, updateChan <-chan FileTailTarget) error { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.tailers[target.FilePath]; ok { return nil // already exists } tailerConfig := TailerConfig{ Target: target, UpdateChan: updateChan, AckGenerator: s.ackGenerator, WorkerCreator: s.workerCreator, CursorDirectory: s.cursorDirectory, WorkerName: s.Name(), } tailer, err := StartTailing(tailerConfig) if err != nil { return fmt.Errorf("AddTarget: %w", err) } s.tailers[target.FilePath] = tailer return nil } // RemoveTarget removes a file from being tailed. // This also removes the cursor file, so this should only be called when the file is not longer expected to be tailed in the future. func (s *TailSource) RemoveTarget(filePath string) { s.mu.Lock() defer s.mu.Unlock() tailer, ok := s.tailers[filePath] if ok { tailer.Stop() tailer.Wait() delete(s.tailers, filePath) cleanCursor(cursorPath(s.cursorDirectory, filePath)) } } func (s *TailSource) ackBatch(cursor_file_name, cursor_file_id string, cursor_position int64) { cursorPath := cursorPath(s.cursorDirectory, cursor_file_name) err := writeCursor(cursorPath, cursor_file_id, cursor_position) if err != nil { logger.Errorf("ackBatches: %s", err) } } type tailcursor struct { FID string `json:"fid"` Cursor int64 `json:"cursor"` } func cursorPath(cursorDirectory string, filename string) string { baseName := strings.ReplaceAll(filename, string(filepath.Separator), "_") cursorFileName := fmt.Sprintf("%s.cursor", baseName) return filepath.Join(cursorDirectory, cursorFileName) } func writeCursor(cursorPath string, file_id string, cursor int64) error { cursorVal := fmt.Sprintf("{\"fid\":\"%s\",\"cursor\":%d}\n", file_id, cursor) output, err := os.Create(cursorPath) if err != nil { return fmt.Errorf("writeCursor: failed to create/truncate cursor file %q: %w", cursorPath, err) } defer output.Close() _, err = output.WriteString(cursorVal) if err != nil { return fmt.Errorf("writeCursor: failed to write cursor file %q: %w", cursorPath, err) } return nil } func readCursor(cursorPath string) (string, int64, error) { file, err := os.Open(cursorPath) if err != nil { return "", 0, fmt.Errorf("readCursor: failed to open cursor file %q: %w", cursorPath, err) } defer file.Close() var line string _, err = fmt.Fscanln(file, &line) if err != nil { return "", 0, fmt.Errorf("readCursor: failed to read cursor file %q: %w", cursorPath, err) } tailcursor := tailcursor{} err = json.Unmarshal([]byte(line), &tailcursor) if err != nil { return "", 0, fmt.Errorf("readCursor: failed to unmarshal cursor file %q: %w", cursorPath, err) } return tailcursor.FID, tailcursor.Cursor, nil } func cleanCursor(cursorPath string) { err := os.Remove(cursorPath) if err != nil { logger.Errorf("cleanCursor: failed to remove cursor file %q: %v", cursorPath, err) } }