internal/pkg/agent/cmd/logs.go (409 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package cmd
import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"time"
"github.com/fatih/color"
"github.com/spf13/cobra"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/cli"
"github.com/elastic/elastic-agent/pkg/core/logger"
)
const (
// 1KB, it's a size of the file chunk we read for searching lines starting
// from the end of the file
logBufferSize = 1024
// when follow logs, on each interval we check log file updates and if a new file appeared
watchInterval = 500 * time.Millisecond
)
var (
logFilePattern = regexp.MustCompile(`elastic-agent(-event-log)?-(\d+)(-\d+)?\.ndjson$`)
errLineFiltered = errors.New("this line was filtered out")
)
// filter for each log line, returns `true` if we print the line
type filterFunc func([]byte) bool
// modifier for each log line, returns a modified message.
// If a modification is anything other than replacing characters
// the new value must be allocated (byte slice).
type modifierFunc func([]byte) []byte
// logEntry represents a part of the elastic agent log entry
type logEntry struct {
Component struct {
ID string `json:"id"`
} `json:"component"`
LogLevel string `json:"log.level"`
}
// createComponentFilter creates a new log entry filter that
// lets print only the log lines that contain the given component ID.
func createComponentFilter(id string) filterFunc {
return func(entry []byte) bool {
var e logEntry
err := json.Unmarshal(entry, &e)
if err != nil {
return false
}
return e.Component.ID == id
}
}
func addColorModifier(entry []byte) []byte {
var e logEntry
err := json.Unmarshal(entry, &e)
if err != nil {
return entry
}
switch strings.ToLower(e.LogLevel) {
case logp.InfoLevel.String():
return []byte(color.CyanString(string(entry)))
case logp.WarnLevel.String():
return []byte(color.YellowString(string(entry)))
case logp.ErrorLevel.String():
return []byte(color.RedString(string(entry)))
case logp.CriticalLevel.String():
return []byte(color.HiRedString(string(entry)))
default:
return entry
}
}
// stackWriter collects written byte slices and then pops them in
// the reversed (LIFO) order.
// Supports filtering and modification of each written byte slice.
type stackWriter struct {
lines [][]byte
filter filterFunc
modifier modifierFunc
}
// Write implements `io.Writer`
func (s *stackWriter) Write(line []byte) (int, error) {
if s.filter != nil && !s.filter(line) {
return 0, errLineFiltered
}
// we must allocate and copy to preserve the state,
// `line` is normally a slice on the reading buffer which
// gets overwritten
l := make([]byte, len(line))
copy(l, line)
if s.modifier != nil {
l = s.modifier(l)
}
s.lines = append(s.lines, l)
return len(l), nil
}
// PopAll pops every line from the stack and writes into `w` in LIFO order.
func (s stackWriter) PopAll(w io.Writer) error {
for i := len(s.lines) - 1; i >= 0; i-- {
_, err := w.Write(s.lines[i])
if err != nil {
return fmt.Errorf("failed to print the log line to the writer: %w", err)
}
_, err = w.Write([]byte{'\n'})
if err != nil {
return fmt.Errorf("failed to print the log line to the writer: %w", err)
}
}
return nil
}
// newWrappedWriter create a writer proxy that filters out log lines according to the given `filter`
func newWrappedWriter(ctx context.Context, w io.Writer, filter filterFunc, modifier modifierFunc) io.Writer {
pr, pw := io.Pipe()
scanner := bufio.NewScanner(pr)
go func() {
for scanner.Scan() {
select {
case <-ctx.Done():
return
default:
line := scanner.Bytes()
if filter != nil && !filter(line) {
continue
}
if modifier != nil {
line = modifier(line)
}
_, _ = w.Write(line)
_, _ = w.Write([]byte{'\n'})
}
}
}()
return pw
}
func newLogsCommandWithArgs(_ []string, streams *cli.IOStreams) *cobra.Command {
logsDir := filepath.Join(paths.Home(), logger.DefaultLogDirectory)
eventLogsDir := filepath.Join(logsDir, "events")
cmd := &cobra.Command{
Use: "logs",
Short: "Output Elastic Agent logs",
Long: "This command allows to output, watch and filter Elastic Agent logs.",
Run: func(c *cobra.Command, _ []string) {
if err := logsCmd(streams, c, logsDir, eventLogsDir); err != nil {
fmt.Fprintf(streams.Err, "Error: %v\n%s\n", err, troubleshootMessage())
os.Exit(1)
}
},
}
cmd.Flags().BoolP("follow", "f", false, "Do not stop when end of file is reached, but rather to wait for additional data to be appended to the log file.")
cmd.Flags().BoolP("no-color", "", false, "Do not apply colors to different log levels.")
cmd.Flags().IntP("number", "n", 10, "Maximum number of lines at the end of logs to output.")
cmd.Flags().Bool("exclude-events", false, "Excludes events log files")
cmd.Flags().StringP("component", "C", "", "Filter logs and output only logs for the given component ID.")
return cmd
}
func logsCmd(streams *cli.IOStreams, cmd *cobra.Command, logsDir, eventLogsDir string) error {
component, _ := cmd.Flags().GetString("component")
lines, _ := cmd.Flags().GetInt("number")
follow, _ := cmd.Flags().GetBool("follow")
noColor, _ := cmd.Flags().GetBool("no-color")
excludeEvents, _ := cmd.Flags().GetBool("exclude-events")
var (
filter filterFunc
modifier modifierFunc
)
if component != "" {
filter = createComponentFilter(component)
}
if !noColor {
modifier = addColorModifier
}
// uncomment for debugging
// fmt.Fprintf(streams.Err, "logs dir: %q", logsDir)
errChan := make(chan error)
go func() {
err := printLogs(cmd.Context(), streams.Out, logsDir, lines, follow, filter, modifier)
if err != nil {
errChan <- fmt.Errorf("failed to get logs: %w", err)
return
}
errChan <- nil
}()
if !excludeEvents {
go func() {
done := false
// The event log folder might not exist, so we keep trying every five seconds
for !done {
err := printLogs(cmd.Context(), streams.Out, eventLogsDir, lines, follow, filter, modifier)
if err != nil {
if !strings.Contains(err.Error(), "logs/events: no such file or directory") {
errChan <- fmt.Errorf("failed to get event logs: %w", err)
return
}
time.Sleep(5 * time.Second)
}
done = true
}
}()
}
if err := <-errChan; err != nil {
return err
}
return nil
}
// printLogs prints the last `lines` number of log lines from the log files in `dir`
// applying the `filter` and printing all the log lines to `w`.
// if `follow` is true it will keep printing all the log updates afterwards.
func printLogs(ctx context.Context, w io.Writer, dir string, lines int, follow bool, filter filterFunc, modifier modifierFunc) error {
files, err := getLogFilenames(dir)
if err != nil {
return fmt.Errorf("failed to fetch log filenames: %w", err)
}
if len(files) == 0 {
return nil
}
stackWriter := &stackWriter{
filter: filter,
modifier: modifier,
}
var (
fileIndex = len(files) - 1
printed = 0
)
buf := make([]byte, logBufferSize)
// we need to store the file size ASAP before it changes by new lines
// but right before we start looking for the last N lines in this file
// to minimize likelihood of corrupted output
fileToFollow := files[fileIndex]
followOffset, err := getFileSize(fileToFollow)
if err != nil {
return fmt.Errorf("failed to prepare for watching file %q: %w", fileToFollow, err)
}
// start looking for the N lines among all log files started with the most recent one
for {
filename := files[fileIndex]
// try to read the requested amount of lines from the end of the file
justPrinted, err := printLogFile(filename, lines-printed, stackWriter, buf)
if err != nil {
return fmt.Errorf("failed to print log file %q: %w", filename, err)
}
// account for what we've read in total, to stop once we reached the given number
printed += justPrinted
if printed >= lines {
break
}
// if we have not read/printed enough lines, we switch to the previous file and repeat
fileIndex--
if fileIndex < 0 {
break
}
}
// all log lines written above were written in LIFO order, we need to invert that
// while writing to the destination writer
err = stackWriter.PopAll(w)
if err != nil {
return fmt.Errorf("failed to write the requested number of lines: %w", err)
}
if follow {
output := w
if filter != nil || modifier != nil {
output = newWrappedWriter(ctx, w, filter, modifier)
}
err = watchLogsDir(ctx, dir, fileToFollow, followOffset, output)
if err != nil {
return fmt.Errorf("failed to follow the logs: %w", err)
}
}
return nil
}
// printLogFile reads the target file defined by the absolute path `filename` backwards in chunks
// defined by the size of the given `buf` until it finds enough lines defined by `maxLines`
// or the whole file is read. Prints all found lines to `w` in LIFO order.
func printLogFile(filename string, maxLines int, w *stackWriter, buf []byte) (linesWritten int, err error) {
file, err := os.Open(filename)
if err != nil {
return 0, fmt.Errorf("failed to open log file %q for reading: %w", filename, err)
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return 0, fmt.Errorf("failed to stat log file %q: %w", filename, err)
}
offset := info.Size()
bufferSize := int64(len(buf))
var leftOverBuf []byte
// reading chunks in reverse starting with the end of the file and try to find
// lines up to the requested amount, once found - stop
for {
offset -= bufferSize
if offset < 0 {
// shorten the buffer so we don't read anything extra during the
// last iteration of `ReadAt`
buf = buf[0 : bufferSize+offset]
// this chunk is smaller than the buffer
offset = 0
}
bytesRead, err := file.ReadAt(buf, offset)
if err != nil && !errors.Is(err, io.EOF) {
return linesWritten, fmt.Errorf("failed to read from log file %q: %w", filename, err)
}
chunk := buf[:bytesRead]
// the current chunk must contain leftovers (incomplete entry) from the previous chunk
if len(leftOverBuf) != 0 {
newChunk := make([]byte, len(chunk)+len(leftOverBuf))
copy(newChunk[:len(chunk)], chunk)
copy(newChunk[len(chunk):], leftOverBuf)
chunk = newChunk
leftOverBuf = nil
}
// the first entry ends at the end for the current chunk
entryEnd := len(chunk)
for i := len(chunk) - 1; i >= 0; i-- {
if chunk[i] != '\n' {
continue
}
// the log entry excluding the new line character
entry := chunk[i+1 : entryEnd]
// the next entry will end where this entry starts
entryEnd = i
// if it's a trailing new line, the entry is empty
if len(entry) == 0 {
continue
}
_, err := w.Write(entry)
if errors.Is(err, errLineFiltered) {
continue
}
if err != nil {
return linesWritten, fmt.Errorf("failed to print log line: %w", err)
}
linesWritten++
if linesWritten == maxLines {
return linesWritten, nil
}
}
// if the last new line character was found somewhere in the middle of the chunk
// we keep the rest which will join the next chunk
if entryEnd != 0 {
leftOverBuf = make([]byte, entryEnd)
copy(leftOverBuf, chunk[:entryEnd])
}
// if there is nothing left to read from the file
if offset == 0 {
break
}
}
// the very last part of the chunk without a new line character becomes
// the final line
if len(leftOverBuf) > 0 {
_, err := w.Write(leftOverBuf)
if errors.Is(err, errLineFiltered) {
return linesWritten, nil
}
if err != nil {
err = fmt.Errorf("failed to print log line: %w", err)
return linesWritten, err
}
linesWritten++
}
return linesWritten, nil
}
// getLogFilenames returns absolute paths to all log files in `dir` sorted in the log rotation order.
func getLogFilenames(dir string) ([]string, error) {
entries, err := os.ReadDir(dir)
if err != nil {
return nil, fmt.Errorf("failed to list logs directory: %w", err)
}
paths := make([]string, 0, len(entries))
for _, e := range entries {
if e.IsDir() || !logFilePattern.MatchString(e.Name()) {
continue
}
paths = append(paths, filepath.Join(dir, e.Name()))
}
sortLogFilenames(paths)
return paths, nil
}
// sortLogFilenames sorts filenames in the order of log rotation
func sortLogFilenames(filenames []string) {
sort.Slice(filenames, func(i, j int) bool {
// e.g. elastic-agent-20230515.ndjson => ["elastic-agent-20230515-1.ndjson", "20230515", "-1"]
iGroups := logFilePattern.FindStringSubmatch(filenames[i])
jGroups := logFilePattern.FindStringSubmatch(filenames[j])
switch {
// e.g. elastic-agent-20230515-1.ndjson vs elastic-agent-20230515-2.ndjson
case iGroups[2] == jGroups[2] && iGroups[3] != "" && jGroups[3] != "":
return iGroups[3] < jGroups[3]
// e.g. elastic-agent-20230515.ndjson vs elastic-agent-20230515-1.ndjson
case iGroups[2] == jGroups[2] && iGroups[3] != "":
return false
// e.g. elastic-agent-20230515-1.ndjson vs elastic-agent-20230515.ndjson
case iGroups[2] == jGroups[2] && jGroups[3] != "":
return true
// e.g. elastic-agent-20230515.ndjson vs elastic-agent-20230516.ndjson
default:
return iGroups[2] < jGroups[2]
}
})
}
// watchLogsDir watches the log directory `dir` for new log lines, starting with the given `startFile` at
// its `startOffset` printing all new content to `w` until the `ctx` is cancelled.
// Once new log lines are written to `startFile` they are printed to `w`.
// Once a new log file is created it switches to watching the new file instead.
// The new state is checked every `watchInterval`.
func watchLogsDir(ctx context.Context, dir, startFile string, startOffset int64, w io.Writer) (err error) {
curFile := startFile
curOffset := startOffset
ticker := time.NewTicker(watchInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return fmt.Errorf("watching %s interrupted: %w", startFile, ctx.Err())
case <-ticker.C:
size, err := getFileSize(curFile)
if err != nil {
return fmt.Errorf("failed to watch the logs dir %q: %w", dir, err)
}
if curOffset != size {
curOffset, err = tailFile(curFile, curOffset, w)
if err != nil {
return fmt.Errorf("failed to watch the logs dir %q: %w", dir, err)
}
}
files, err := getLogFilenames(dir)
if err != nil {
return fmt.Errorf("failed to watch the logs dir %q: %w", dir, err)
}
i := len(files) - 1
for ; i >= 0; i-- {
if files[i] == curFile {
break
}
}
if i == len(files)-1 {
continue
}
curFile = files[i+1]
curOffset = 0
}
}
}
// getFileSize returns a file size of the given file.
func getFileSize(file string) (int64, error) {
info, err := os.Stat(file)
if err != nil {
return 0, fmt.Errorf("failed to stat file %q: %w", file, err)
}
return info.Size(), nil
}
// tailFile prints the tail of the `file` to `w` starting from `offset`.
func tailFile(file string, offset int64, w io.Writer) (size int64, err error) {
f, err := os.Open(file)
if err != nil {
return 0, fmt.Errorf("failed to open file %q: %w", file, err)
}
defer f.Close()
_, err = f.Seek(offset, io.SeekStart)
if err != nil {
return 0, fmt.Errorf("failed to seek to %d in file %q: %w", offset, file, err)
}
_, err = io.Copy(w, f)
if err != nil {
return size, fmt.Errorf("failed to print file %s: %w", file, err)
}
size, err = getFileSize(file)
if err != nil {
return size, fmt.Errorf("failed to get file size %s: %w", file, err)
}
return size, nil
}