func()

in plugins/queue/mmap/queue.go [114:159]


func (q *Queue) Initialize() error {
	// the size of each segment file should be a multiple of the page size.
	pageSize := os.Getpagesize()
	if q.SegmentSize%pageSize != 0 {
		q.SegmentSize -= q.SegmentSize % pageSize
	}
	if q.SegmentSize/pageSize == 0 {
		q.SegmentSize = data4KB
	}
	// the minimum MaxInMemSegments value should be 4.
	if q.MaxInMemSegments < minimumSegments {
		q.MaxInMemSegments = minimumSegments
	}
	q.queueName = q.Name() + "_" + q.PipeName
	// load metadata and override the reading or writing offset by the committed or watermark offset.
	md, err := meta.NewMetaData(q.queueName, q.QueueCapacitySegments)
	if err != nil {
		return fmt.Errorf("error in creating the metadata: %v", err)
	}
	q.meta = md
	cmID, cmOffset := md.GetCommittedOffset()
	wmID, wmOffset := md.GetWatermarkOffset()
	md.PutWritingOffset(wmID, wmOffset)
	md.PutReadingOffset(cmID, cmOffset)
	// keep the reading or writing segments in the memory.
	q.segments = make([]*mmap.File, q.QueueCapacitySegments)
	if _, err := q.GetSegment(cmID); err != nil {
		return err
	}
	if _, err := q.GetSegment(wmID); err != nil {
		return err
	}
	// init components
	q.insufficientMemChannel = make(chan struct{})
	q.sufficientMemChannel = make(chan struct{})
	q.markReadChannel = make(chan int64, 1)
	q.flushChannel = make(chan struct{})
	q.ctx, q.cancel = context.WithCancel(context.Background())
	// async supported processes.
	q.showDownWg.Add(2)
	q.locker = make([]int32, q.QueueCapacitySegments)
	go q.segmentSwapper()
	go q.flush()
	q.ready = true
	return nil
}