in plugins/queue/mmap/segment_operation.go [111:135]
func (q *Queue) doSwap() error {
rID, _ := q.meta.GetReadingOffset()
wID, _ := q.meta.GetWritingOffset()
logicWID := wID + int64(q.QueueCapacitySegments)
wIndex := q.GetIndex(wID)
rIndex := q.GetIndex(rID)
// only clear all memory-mapped file when more than 1.5 times MaxInMemSegments.
clearAll := (wID - rID + 1) > int64(q.MaxInMemSegments)*3/2
for q.mmapCount >= q.MaxInMemSegments {
for i := logicWID - 1; i >= 0 && i >= logicWID-int64(q.MaxInMemSegments); i-- {
if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
continue
}
if err := q.unmapSegment(i); err != nil {
return err
}
// the writing segment and the reading segment should still in memory.
// q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
if !clearAll && q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
return nil
}
}
}
return nil
}