plugins/queue/mmap/segment_operation.go (103 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //go:build !windows package mmap import ( "context" "fmt" "path" "strconv" "sync/atomic" "syscall" "github.com/grandecola/mmap" "github.com/apache/skywalking-satellite/internal/pkg/log" "github.com/apache/skywalking-satellite/plugins/queue/mmap/segment" ) // GetSegment returns a memory mapped file at the segmentID position. func (q *Queue) GetSegment(segmentID int64) (*mmap.File, error) { if atomic.LoadInt32(&q.mmapCount) >= q.MaxInMemSegments { q.insufficientMemChannel <- struct{}{} <-q.sufficientMemChannel } if err := q.mapSegment(segmentID); err != nil { return nil, err } index := q.GetIndex(segmentID) if q.segments[index] != nil { return q.segments[index], nil } return nil, fmt.Errorf("cannot get a memory mapped file at %d segment", segmentID) } // mapSegment load the segment file reference to the segments. func (q *Queue) mapSegment(segmentID int64) error { index := q.GetIndex(segmentID) if q.segments[index] != nil { return nil } filePath := path.Join(q.queueName, strconv.Itoa(index)+segment.FileSuffix) file, err := segment.NewSegment(filePath, q.SegmentSize) if err != nil { return err } atomic.AddInt32(&q.mmapCount, 1) q.segments[index] = file return nil } // unmapSegment cancel the memory mapped status. func (q *Queue) unmapSegment(segmentID int64) error { index := q.GetIndex(segmentID) if q.segments[index] == nil { return nil } if err := q.segments[index].Flush(syscall.MS_SYNC); err != nil { return fmt.Errorf("error in flush segemnt when unmapping: %v", err) } if err := q.segments[index].Unmap(); err != nil { return fmt.Errorf("error in unmap segemnt: %v", err) } atomic.AddInt32(&q.mmapCount, -1) q.segments[index] = nil return nil } // segmentSwapper run with a go routine to ensure the memory cost. func (q *Queue) segmentSwapper() { defer q.showDownWg.Done() ctx, _ := context.WithCancel(q.ctx) // nolint for { select { case id := <-q.markReadChannel: q.lock(id) if q.unmapSegment(id) != nil { log.Logger.Errorf("cannot unmap the markread segment: %d", id) } q.unlock(id) case <-q.insufficientMemChannel: if q.mmapCount >= q.MaxInMemSegments { if q.doSwap() != nil { log.Logger.Errorf("cannot get enough memory to receive new data") } } q.sufficientMemChannel <- struct{}{} case <-ctx.Done(): return } } } // doSwap swap the memory mapped files to normal files to promise the memory resources cost. func (q *Queue) doSwap() error { rID, _ := q.meta.GetReadingOffset() wID, _ := q.meta.GetWritingOffset() logicWID := wID + int64(q.QueueCapacitySegments) wIndex := q.GetIndex(wID) rIndex := q.GetIndex(rID) // only clear all memory-mapped file when more than 1.5 times MaxInMemSegments. clearAll := (wID - rID + 1) > int64(q.MaxInMemSegments)*3/2 for q.mmapCount >= q.MaxInMemSegments { for i := logicWID - 1; i >= 0 && i >= logicWID-int64(q.MaxInMemSegments); i-- { if q.GetIndex(i) == wIndex || q.GetIndex(i) == rIndex { continue } if err := q.unmapSegment(i); err != nil { return err } // the writing segment and the reading segment should still in memory. // q.MaxInMemSegments/2-1 means keeping half available spaces to receive new data. if !clearAll && q.MaxInMemSegments-q.mmapCount >= q.MaxInMemSegments/2-1 { return nil } } } return nil } // GetIndex returns the index of the segments. func (q *Queue) GetIndex(segmentID int64) int { return int(segmentID) % q.QueueCapacitySegments }