in galog.go [640:709]
func (lg *logger) runBackend(ctx context.Context, backend Backend, bq *BackendQueue) {
// enqueue guarantees the max size of the entry queue.
enqueue := func(force bool, config Config, item *LogEntry) bool {
bq.entriesMutex.Lock()
defer bq.entriesMutex.Unlock()
if !force && len(bq.entries) == 0 {
return false
}
if config == nil || config.QueueSize() == 0 {
return false
}
bq.entries = append(bq.entries, item)
queueSize := config.QueueSize()
// If we've reached the queue limit we remove the oldest entry from the
// queue.
if queueSize > 0 && len(bq.entries) > queueSize {
bq.entries = bq.entries[1:]
}
return true
}
var wg sync.WaitGroup
wg.Add(1)
// Runs the the periodical processing of the queue.
go func() {
defer wg.Done()
for {
select {
// Context cancelation handling.
case <-ctx.Done():
bq.ticker.Stop()
return
// Periodically queue processing.
case <-bq.ticker.C:
lg.flushEnqueuedEntries(ctx, bq)
}
}
}()
wg.Add(1)
// Runs the entry enqueueing and backend unregistration.
go func() {
defer wg.Done()
for {
select {
// Context cancelation handling.
case <-ctx.Done():
return
// Backend unregistering handling.
case <-bq.cancel:
return
// Entry enqueueing handling.
case entry := <-bq.bus:
if enqueue(false, backend.Config(), entry) {
continue
}
if err := backend.Log(entry); err != nil {
enqueue(true, backend.Config(), entry)
}
}
}
}()
wg.Wait()
}