func()

in plugins/queue/mmap/segment_operation.go [111:135]


func (q *Queue) doSwap() error {
	rID, _ := q.meta.GetReadingOffset()
	wID, _ := q.meta.GetWritingOffset()
	logicWID := wID + int64(q.QueueCapacitySegments)
	wIndex := q.GetIndex(wID)
	rIndex := q.GetIndex(rID)
	//  only clear all memory-mapped file when more than 1.5 times MaxInMemSegments.
	clearAll := (wID - rID + 1) > int64(q.MaxInMemSegments)*3/2
	for q.mmapCount >= q.MaxInMemSegments {
		for i := logicWID - 1; i >= 0 && i >= logicWID-int64(q.MaxInMemSegments); i-- {
			if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex {
				continue
			}
			if err := q.unmapSegment(i); err != nil {
				return err
			}
			// the writing segment and the reading segment should still in memory.
			// q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data.
			if !clearAll && q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 {
				return nil
			}
		}
	}
	return nil
}