in plugins/queue/mmap/queue.go [114:159]
func (q *Queue) Initialize() error {
// the size of each segment file should be a multiple of the page size.
pageSize := os.Getpagesize()
if q.SegmentSize%pageSize != 0 {
q.SegmentSize -= q.SegmentSize % pageSize
}
if q.SegmentSize/pageSize == 0 {
q.SegmentSize = data4KB
}
// the minimum MaxInMemSegments value should be 4.
if q.MaxInMemSegments < minimumSegments {
q.MaxInMemSegments = minimumSegments
}
q.queueName = q.Name() + "_" + q.PipeName
// load metadata and override the reading or writing offset by the committed or watermark offset.
md, err := meta.NewMetaData(q.queueName, q.QueueCapacitySegments)
if err != nil {
return fmt.Errorf("error in creating the metadata: %v", err)
}
q.meta = md
cmID, cmOffset := md.GetCommittedOffset()
wmID, wmOffset := md.GetWatermarkOffset()
md.PutWritingOffset(wmID, wmOffset)
md.PutReadingOffset(cmID, cmOffset)
// keep the reading or writing segments in the memory.
q.segments = make([]*mmap.File, q.QueueCapacitySegments)
if _, err := q.GetSegment(cmID); err != nil {
return err
}
if _, err := q.GetSegment(wmID); err != nil {
return err
}
// init components
q.insufficientMemChannel = make(chan struct{})
q.sufficientMemChannel = make(chan struct{})
q.markReadChannel = make(chan int64, 1)
q.flushChannel = make(chan struct{})
q.ctx, q.cancel = context.WithCancel(context.Background())
// async supported processes.
q.showDownWg.Add(2)
q.locker = make([]int32, q.QueueCapacitySegments)
go q.segmentSwapper()
go q.flush()
q.ready = true
return nil
}