executors/kubernetes/log_processor.go (248 lines of code) (raw):
package kubernetes
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"
)
type logStreamer interface {
Stream(ctx context.Context, offset int64, output io.Writer) error
fmt.Stringer
}
type logScanner struct {
reader *bufio.Reader
err error
}
// Err returns the first non-EOF error that was encountered by the Scanner.
func (ls *logScanner) Err() error {
if ls.err == io.EOF {
return nil
}
return ls.err
}
type kubernetesLogStreamer struct {
kubernetesLogProcessorPodConfig
client kubernetes.Interface
clientConfig *restclient.Config
executor RemoteExecutor
}
func (s *kubernetesLogStreamer) Stream(ctx context.Context, offset int64, output io.Writer) error {
exec := ExecOptions{
Namespace: s.namespace,
PodName: s.pod,
ContainerName: s.container,
Stdin: false,
Command: []string{
"gitlab-runner-helper",
"read-logs",
"--path",
s.logPath,
"--offset",
strconv.FormatInt(offset, 10),
"--wait-file-timeout",
s.waitLogFileTimeout.String(),
},
Out: output,
Err: output,
Executor: s.executor,
KubeClient: s.client,
Config: s.clientConfig,
Context: ctx,
}
return exec.executeRequest()
}
func (s *kubernetesLogStreamer) String() string {
return fmt.Sprintf("%s/%s/%s:%s", s.namespace, s.pod, s.container, s.logPath)
}
type logProcessor interface {
// Process listens for log lines
// consumers must read from the channel until it's closed
// consumers are also notified in case of error through the error channel
Process(ctx context.Context) (<-chan string, <-chan error)
// Finalize waits for all Goroutines called in Process() to finish.
Finalize()
}
type backoffCalculator interface {
ForAttempt(attempt float64) time.Duration
}
// kubernetesLogProcessor processes the logs from a container and tries to reattach
// to the stream constantly, stopping only when the passed context is cancelled.
type kubernetesLogProcessor struct {
backoff backoffCalculator
logger logrus.FieldLogger
logStreamer logStreamer
wg sync.WaitGroup
logsOffset int64
}
type kubernetesLogProcessorPodConfig struct {
namespace string
pod string
container string
logPath string
waitLogFileTimeout time.Duration
}
func newKubernetesLogProcessor(
client kubernetes.Interface,
clientConfig *restclient.Config,
backoff backoffCalculator,
logger logrus.FieldLogger,
podCfg kubernetesLogProcessorPodConfig,
) *kubernetesLogProcessor {
logStreamer := &kubernetesLogStreamer{
kubernetesLogProcessorPodConfig: podCfg,
client: client,
clientConfig: clientConfig,
executor: new(DefaultRemoteExecutor),
}
return &kubernetesLogProcessor{
backoff: backoff,
logger: logger,
logStreamer: logStreamer,
}
}
func (l *kubernetesLogProcessor) Process(ctx context.Context) (<-chan string, <-chan error) {
outCh := make(chan string)
errCh := make(chan error)
go func() {
defer close(outCh)
defer close(errCh)
l.attach(ctx, outCh, errCh)
}()
return outCh, errCh
}
func (l *kubernetesLogProcessor) Finalize() {
l.wg.Wait()
}
func (l *kubernetesLogProcessor) attach(ctx context.Context, outCh chan string, errCh chan error) {
var (
attempt float64 = -1
backoffDuration time.Duration
)
for {
// We do not exit because we need the processLogs goroutine still running.
// Once the error message is sent, a new step cleanup variables is started.
// As the pod is still running, the processLogs goroutine is not launched anymore.
// This is why, even though the error is sent to fail the ongoing step,
// we keep trying to reconnect to the output log, as a new one is created for variables cleanup.
attempt++
if attempt > 0 {
backoffDuration = l.backoff.ForAttempt(attempt)
l.logger.Debugln(fmt.Sprintf(
"Backing off reattaching log for %s for %s (attempt %f)",
l.logStreamer,
backoffDuration,
attempt,
))
}
select {
case <-ctx.Done():
l.logger.Debugln(fmt.Sprintf("Detaching from log... %v", ctx.Err()))
return
case <-time.After(backoffDuration):
err := l.processStream(ctx, outCh)
exitCode := getExitCode(err)
switch {
case exitCode == outputLogFileNotExistsExitCode:
// The cleanup variables step recreates a new output.log file
// where the shells.TrapCommandExitStatus is written.
// To not miss this line, we need to have the offset reset when we reconnect to the newly created log
l.logsOffset = 0
errCh <- fmt.Errorf("output log file deleted, cannot continue %w", err)
case err != nil:
l.logger.Warningln(fmt.Sprintf("Error %v. Retrying...", err))
default:
l.logger.Debug("processStream exited with no error")
}
}
}
}
func (l *kubernetesLogProcessor) processStream(ctx context.Context, outCh chan string) error {
reader, writer := io.Pipe()
defer func() {
_ = reader.Close()
_ = writer.Close()
}()
// Using errgroup.WithContext doesn't work here since if either one of the goroutines
// exits with a nil error, we can't signal the other one to exit
ctx, cancel := context.WithCancel(ctx)
var gr errgroup.Group
logsOffset := l.logsOffset
gr.Go(func() error {
defer cancel()
err := l.logStreamer.Stream(ctx, logsOffset, writer)
// prevent printing an error that the container exited
// when the context is already cancelled
if errors.Is(ctx.Err(), context.Canceled) {
return nil
}
if err != nil {
err = fmt.Errorf("streaming logs %s: %w", l.logStreamer, err)
}
return err
})
gr.Go(func() error {
defer cancel()
err := l.readLogs(ctx, reader, outCh)
if err != nil {
err = fmt.Errorf("reading logs %s: %w", l.logStreamer, err)
}
return err
})
return gr.Wait()
}
func (l *kubernetesLogProcessor) readLogs(ctx context.Context, logs io.Reader, outCh chan string) error {
var previousLogsOffset int64 = -1
logsScanner, linesCh := l.scan(ctx, logs)
for {
select {
case <-ctx.Done():
return nil
case line, more := <-linesCh:
if !more {
l.logger.Debug("No more data in linesCh")
return logsScanner.Err()
}
newLogsOffset, logLine := l.parseLogLine(line)
if newLogsOffset != -1 {
l.logsOffset = newLogsOffset
}
// Helper when reading the log add a new line when the buffer doesn't end with a new line
// This makes the buffer size greater than the log offset shift
// When the buffer size is greater than the log offset shift and the additional character is a \n
// it can then be safely removed as it is likely the addition character added by helper
if l := len(logLine); previousLogsOffset != -1 &&
l > int(newLogsOffset-previousLogsOffset) && logLine[l-1] == '\n' {
logLine = logLine[:l-1]
}
previousLogsOffset = newLogsOffset
outCh <- logLine
}
}
}
func (l *kubernetesLogProcessor) scan(ctx context.Context, logs io.Reader) (*logScanner, <-chan string) {
logsScanner := &logScanner{
reader: bufio.NewReaderSize(logs, bufio.MaxScanTokenSize),
err: nil,
}
linesCh := make(chan string)
l.wg.Add(1)
go func() {
defer l.wg.Done()
defer close(linesCh)
// This goroutine will exit when the calling method closes the logs stream or the context is cancelled
for {
data, err := logsScanner.reader.ReadString('\n')
if err != nil {
logsScanner.err = err
break
}
select {
case <-ctx.Done():
return
case linesCh <- data:
}
}
}()
return logsScanner, linesCh
}
// Each line starts with its bytes offset. We need this to resume the log from that point
// if we detach for some reason. The format is "10 log line continues as normal".
// The line doesn't include the new line character.
// Lines without offset are acceptable and return -1 for offset.
func (l *kubernetesLogProcessor) parseLogLine(line string) (int64, string) {
if line == "" {
return -1, ""
}
offsetIndex := strings.Index(line, " ")
if offsetIndex == -1 {
return -1, line
}
offset := line[:offsetIndex]
parsedOffset, err := strconv.ParseInt(offset, 10, 64)
if err != nil {
return -1, line
}
logLine := line[offsetIndex+1:]
return parsedOffset, logLine
}