in plugins/queue/mmap/segment_operation.go [86:108]
func (q *Queue) segmentSwapper() {
defer q.showDownWg.Done()
ctx, _ := context.WithCancel(q.ctx) // nolint
for {
select {
case id := <-q.markReadChannel:
q.lock(id)
if q.unmapSegment(id) != nil {
log.Logger.Errorf("cannot unmap the markread segment: %d", id)
}
q.unlock(id)
case <-q.insufficientMemChannel:
if q.mmapCount >= q.MaxInMemSegments {
if q.doSwap() != nil {
log.Logger.Errorf("cannot get enough memory to receive new data")
}
}
q.sufficientMemChannel <- struct{}{}
case <-ctx.Done():
return
}
}
}