in logger/buffered_logger.go [88:147]
func (bl *bufferedLogger) Start(
ctx context.Context,
cleanupTime *time.Duration,
ready func() error,
) error {
pipeNameToPipe, err := bl.l.GetPipes()
if err != nil {
return err
}
var logWG sync.WaitGroup
logWG.Add(1)
stopTracingLogRoutingChan := make(chan bool, 1)
atomic.StoreUint64(&bytesReadFromSrc, 0)
atomic.StoreUint64(&bytesSentToDst, 0)
atomic.StoreUint64(&numberOfNewLineChars, 0)
go func() {
startTracingLogRouting(bl.containerID, stopTracingLogRoutingChan)
logWG.Done()
}()
defer func() {
debug.SendEventsToLog(DaemonName, "Sending signal to stop the ticker.", debug.DEBUG, 0)
stopTracingLogRoutingChan <- true
logWG.Wait()
}()
errGroup, ctx := errgroup.WithContext(ctx)
// Start the goroutine of underlying log driver to consume logs from ring buffer and
// send logs to destination when there's any.
errGroup.Go(func() error {
debug.SendEventsToLog(DaemonName, "Starting consuming logs from ring buffer", debug.INFO, 0)
return bl.sendLogMessagesToDestination(cleanupTime)
})
// Start reading logs from container pipes.
for pn, p := range pipeNameToPipe {
// Copy pn and p to new variables source and pipe, accordingly.
source := pn
pipe := p
errGroup.Go(func() error {
debug.SendEventsToLog(DaemonName, fmt.Sprintf("Reading logs from pipe %s", source), debug.DEBUG, 0)
logErr := bl.saveLogMessagesToRingBuffer(ctx, pipe, source)
if logErr != nil {
err := fmt.Errorf("failed to send logs from pipe %s: %w", source, logErr)
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
return nil
})
}
// Signal that the container is ready to be started
if err := ready(); err != nil {
return fmt.Errorf("failed to check container ready status: %w", err)
}
// Wait() will return the first error it receives.
return errGroup.Wait()
}