func()

in logger/buffered_logger.go [88:147]


func (bl *bufferedLogger) Start(
	ctx context.Context,
	cleanupTime *time.Duration,
	ready func() error,
) error {
	pipeNameToPipe, err := bl.l.GetPipes()
	if err != nil {
		return err
	}

	var logWG sync.WaitGroup
	logWG.Add(1)
	stopTracingLogRoutingChan := make(chan bool, 1)
	atomic.StoreUint64(&bytesReadFromSrc, 0)
	atomic.StoreUint64(&bytesSentToDst, 0)
	atomic.StoreUint64(&numberOfNewLineChars, 0)
	go func() {
		startTracingLogRouting(bl.containerID, stopTracingLogRoutingChan)
		logWG.Done()
	}()
	defer func() {
		debug.SendEventsToLog(DaemonName, "Sending signal to stop the ticker.", debug.DEBUG, 0)
		stopTracingLogRoutingChan <- true
		logWG.Wait()
	}()

	errGroup, ctx := errgroup.WithContext(ctx)
	// Start the goroutine of underlying log driver to consume logs from ring buffer and
	// send logs to destination when there's any.
	errGroup.Go(func() error {
		debug.SendEventsToLog(DaemonName, "Starting consuming logs from ring buffer", debug.INFO, 0)
		return bl.sendLogMessagesToDestination(cleanupTime)
	})

	// Start reading logs from container pipes.
	for pn, p := range pipeNameToPipe {
		// Copy pn and p to new variables source and pipe, accordingly.
		source := pn
		pipe := p

		errGroup.Go(func() error {
			debug.SendEventsToLog(DaemonName, fmt.Sprintf("Reading logs from pipe %s", source), debug.DEBUG, 0)
			logErr := bl.saveLogMessagesToRingBuffer(ctx, pipe, source)
			if logErr != nil {
				err := fmt.Errorf("failed to send logs from pipe %s: %w", source, logErr)
				debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
				return err
			}
			return nil
		})
	}

	// Signal that the container is ready to be started
	if err := ready(); err != nil {
		return fmt.Errorf("failed to check container ready status: %w", err)
	}

	// Wait() will return the first error it receives.
	return errGroup.Wait()
}