func()

in galog.go [640:709]


func (lg *logger) runBackend(ctx context.Context, backend Backend, bq *BackendQueue) {
	// enqueue guarantees the max size of the entry queue.
	enqueue := func(force bool, config Config, item *LogEntry) bool {
		bq.entriesMutex.Lock()
		defer bq.entriesMutex.Unlock()

		if !force && len(bq.entries) == 0 {
			return false
		}

		if config == nil || config.QueueSize() == 0 {
			return false
		}

		bq.entries = append(bq.entries, item)
		queueSize := config.QueueSize()
		// If we've reached the queue limit we remove the oldest entry from the
		// queue.
		if queueSize > 0 && len(bq.entries) > queueSize {
			bq.entries = bq.entries[1:]
		}

		return true
	}

	var wg sync.WaitGroup

	wg.Add(1)
	// Runs the the periodical processing of the queue.
	go func() {
		defer wg.Done()
		for {
			select {
			// Context cancelation handling.
			case <-ctx.Done():
				bq.ticker.Stop()
				return
			// Periodically queue processing.
			case <-bq.ticker.C:
				lg.flushEnqueuedEntries(ctx, bq)
			}
		}
	}()

	wg.Add(1)
	// Runs the entry enqueueing and backend unregistration.
	go func() {
		defer wg.Done()
		for {
			select {
			// Context cancelation handling.
			case <-ctx.Done():
				return
			// Backend unregistering handling.
			case <-bq.cancel:
				return
			// Entry enqueueing handling.
			case entry := <-bq.bus:
				if enqueue(false, backend.Config(), entry) {
					continue
				}
				if err := backend.Log(entry); err != nil {
					enqueue(true, backend.Config(), entry)
				}
			}
		}
	}()

	wg.Wait()
}