func()

in plugins/queue/mmap/segment_operation.go [86:108]


func (q *Queue) segmentSwapper() {
	defer q.showDownWg.Done()
	ctx, _ := context.WithCancel(q.ctx) // nolint
	for {
		select {
		case id := <-q.markReadChannel:
			q.lock(id)
			if q.unmapSegment(id) != nil {
				log.Logger.Errorf("cannot unmap the markread segment: %d", id)
			}
			q.unlock(id)
		case <-q.insufficientMemChannel:
			if q.mmapCount >= q.MaxInMemSegments {
				if q.doSwap() != nil {
					log.Logger.Errorf("cannot get enough memory to receive new data")
				}
			}
			q.sufficientMemChannel <- struct{}{}
		case <-ctx.Done():
			return
		}
	}
}