libbeat/publisher/queue/memqueue/runloop.go (152 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package memqueue import ( "time" "github.com/elastic/beats/v7/libbeat/publisher/queue" ) // runLoop internal state. These fields could mostly be local variables // in runLoop.run(), but they're exposed here to facilitate testing. In a // live queue, only the runLoop goroutine should read or write these fields. type runLoop struct { broker *broker // observer is a metrics observer used to report internal queue state. observer queue.Observer // The index of the beginning of the current ring buffer within its backing // array. If the queue isn't empty, bufPos points to the oldest remaining // event. bufPos int // The total number of events in the queue. eventCount int // The number of consumed events waiting for acknowledgment. The next Get // request will return events starting at position // (bufPos + consumedCount) % len(buf). consumedCount int // The list of batches that have been consumed and are waiting to be sent // to ackLoop for acknowledgment handling. (This list doesn't contain all // outstanding batches, only the ones not yet forwarded to ackLoop.) consumedBatches batchList // If there aren't enough events ready to fill an incoming get request, // the queue may block based on its flush settings. When this happens, // pendingGetRequest stores the request until we're ready to handle it. pendingGetRequest *getRequest // This timer tracks the configured flush timeout when we will respond // to a pending getRequest even if we can't fill the requested event count. // It is active if and only if pendingGetRequest is non-nil. getTimer *time.Timer // closing is set when a close request is received. Once closing is true, // the queue will not accept any new events, but will continue responding // to Gets and Acks to allow pending events to complete on shutdown. closing bool // TODO (https://github.com/elastic/beats/issues/37893): entry IDs were a // workaround for an external project that no longer exists. At this point // they just complicate the API and should be removed. nextEntryID queue.EntryID } func newRunLoop(broker *broker, observer queue.Observer) *runLoop { var timer *time.Timer // Create the timer we'll use for get requests, but stop it until a // get request is active. if broker.settings.FlushTimeout > 0 { timer = time.NewTimer(broker.settings.FlushTimeout) if !timer.Stop() { <-timer.C } } return &runLoop{ broker: broker, observer: observer, getTimer: timer, } } func (l *runLoop) run() { for l.broker.ctx.Err() == nil { l.runIteration() } } // Perform one iteration of the queue's main run loop. Broken out into a // standalone helper function to allow testing of loop invariants. func (l *runLoop) runIteration() { var pushChan chan pushRequest // Push requests are enabled if the queue isn't full or closing. if l.eventCount < len(l.broker.buf) && !l.closing { pushChan = l.broker.pushChan } var getChan chan getRequest // Get requests are enabled if the queue has events that weren't yet sent // to consumers, and no existing request is active. if l.pendingGetRequest == nil && l.eventCount > l.consumedCount { getChan = l.broker.getChan } var consumedChan chan batchList // Enable sending to the scheduled ACKs channel if we have // something to send. if !l.consumedBatches.empty() { consumedChan = l.broker.consumedChan } var timeoutChan <-chan time.Time // Enable the timeout channel if a get request is waiting for events if l.pendingGetRequest != nil { timeoutChan = l.getTimer.C } select { case <-l.broker.closeChan: l.closing = true close(l.broker.closingChan) // Get requests are handled immediately during shutdown l.maybeUnblockGetRequest() case <-l.broker.ctx.Done(): // The queue is fully shut down, do nothing return case req := <-pushChan: // producer pushing new event l.handleInsert(&req) case req := <-getChan: // consumer asking for next batch l.handleGetRequest(&req) case consumedChan <- l.consumedBatches: // We've sent all the pending batches to the ackLoop for processing, // clear the pending list. l.consumedBatches = batchList{} case count := <-l.broker.deleteChan: l.handleDelete(count) case <-timeoutChan: // The get timer has expired, handle the blocked request l.getTimer.Stop() l.handleGetReply(l.pendingGetRequest) l.pendingGetRequest = nil } // Check for final shutdown (if we are closing and the event buffer is // completely drained) if l.closing && l.eventCount == 0 { l.broker.ctxCancel() } } func (l *runLoop) handleGetRequest(req *getRequest) { if req.entryCount <= 0 || req.entryCount > l.broker.settings.MaxGetRequest { req.entryCount = l.broker.settings.MaxGetRequest } if l.getRequestShouldBlock(req) { l.pendingGetRequest = req l.getTimer.Reset(l.broker.settings.FlushTimeout) return } l.handleGetReply(req) } func (l *runLoop) getRequestShouldBlock(req *getRequest) bool { if l.broker.settings.FlushTimeout <= 0 || l.closing { // Never block if the flush timeout isn't positive, or during shutdown return false } eventsAvailable := l.eventCount - l.consumedCount // Block if the available events aren't enough to fill the request return eventsAvailable < req.entryCount } // Respond to the given get request without blocking or waiting for more events func (l *runLoop) handleGetReply(req *getRequest) { eventsAvailable := l.eventCount - l.consumedCount batchSize := req.entryCount if eventsAvailable < batchSize { batchSize = eventsAvailable } startIndex := l.bufPos + l.consumedCount batch := newBatch(l.broker, startIndex, batchSize) batchBytes := 0 for i := 0; i < batchSize; i++ { batchBytes += batch.rawEntry(i).eventSize } // Send the batch to the caller and update internal state req.responseChan <- batch l.consumedBatches.append(batch) l.consumedCount += batchSize l.observer.ConsumeEvents(batchSize, batchBytes) } func (l *runLoop) handleDelete(count int) { byteCount := 0 for i := 0; i < count; i++ { entry := l.broker.buf[(l.bufPos+i)%len(l.broker.buf)] byteCount += entry.eventSize } // Advance position and counters. Event data was already cleared in // batch.FreeEntries when the events were vended. l.bufPos = (l.bufPos + count) % len(l.broker.buf) l.eventCount -= count l.consumedCount -= count l.observer.RemoveEvents(count, byteCount) } func (l *runLoop) handleInsert(req *pushRequest) { l.insert(req, l.nextEntryID) // Send back the new event id. req.resp <- l.nextEntryID l.nextEntryID++ l.eventCount++ // See if this gave us enough for a new batch l.maybeUnblockGetRequest() } // Checks if we can handle pendingGetRequest yet, and handles it if so func (l *runLoop) maybeUnblockGetRequest() { // If a get request is blocked waiting for more events, check if // we should unblock it. if getRequest := l.pendingGetRequest; getRequest != nil { if !l.getRequestShouldBlock(getRequest) { l.pendingGetRequest = nil if !l.getTimer.Stop() { <-l.getTimer.C } l.handleGetReply(getRequest) } } } func (l *runLoop) insert(req *pushRequest, id queue.EntryID) { index := (l.bufPos + l.eventCount) % len(l.broker.buf) l.broker.buf[index] = queueEntry{ event: req.event, eventSize: req.eventSize, id: id, producer: req.producer, producerID: req.producerID, } l.observer.AddEvent(req.eventSize) }