plugins/queue/mmap/meta/meta.go (153 lines of code) (raw):

// Licensed to Apache Software Foundation (ASF) under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Apache Software Foundation (ASF) licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. //go:build !windows package meta import ( "fmt" "math" "path/filepath" "sync" "syscall" "github.com/grandecola/mmap" "github.com/apache/skywalking-satellite/plugins/queue/mmap/segment" ) const ( metaSize = 80 metaName = "meta.dat" metaVersion = 1 ) // Offset position const ( versionPos = iota * 8 widPos woffsetPos wmidPos wmoffsetPos cidPos coffsetPos ridPos roffsetPos capacityPos ) // Metadata only needs 80B to store the Metadata for the pipe. But for memory alignment, // it takes at least one memory page size, which is generally 4K. // // [ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ][ 8Bit ] // [metaVersion][ ID ][ offset][ ID ][ offset][ ID ][ offset][ ID ][ offset][capacity] // [metaVersion][writing offset][watermark offset][committed offset][reading offset][capacity] type Metadata struct { metaFile *mmap.File name string size int capacity int lock sync.RWMutex } // NewMetaData read or create a Metadata with supported metaVersion func NewMetaData(metaDir string, capacity int) (*Metadata, error) { path := filepath.Join(metaDir, metaName) metaFile, err := segment.NewSegment(path, metaSize) if err != nil { return nil, fmt.Errorf("error in crating the Metadata memory mapped file: %v", err) } m := &Metadata{ metaFile: metaFile, name: metaName, size: metaSize, capacity: capacity, } v := m.GetVersion() if v != 0 && v != metaVersion { return nil, fmt.Errorf("metadata metaVersion is not matching, the Metadata metaVersion is %d", v) } c := m.GetCapacity() if c != 0 && c != capacity { return nil, fmt.Errorf("metadata catapacity is not equal to the old capacity, the old capacity is %d", c) } m.PutVersion(metaVersion) m.PutCapacity(int64(capacity)) return m, nil } // GetVersion returns the meta version. func (m *Metadata) GetVersion() int { m.lock.RLock() defer m.lock.RUnlock() return uint64ToInt(m.metaFile.ReadUint64At(versionPos)) } // PutVersion put the version into the memory mapped file. func (m *Metadata) PutVersion(version int64) { m.lock.Lock() defer m.lock.Unlock() m.metaFile.WriteUint64At(int64ToUint64(version), versionPos) } // GetWritingOffset returns the writing offset, which contains the segment ID and the offset of the segment. func (m *Metadata) GetWritingOffset() (segmentID, offset int64) { m.lock.RLock() defer m.lock.RUnlock() return uint64ToInt64(m.metaFile.ReadUint64At(widPos)), uint64ToInt64(m.metaFile.ReadUint64At(woffsetPos)) } // PutWritingOffset put the segment ID and the offset of the segment into the writing offset. func (m *Metadata) PutWritingOffset(segmentID, offset int64) { m.lock.Lock() defer m.lock.Unlock() m.metaFile.WriteUint64At(int64ToUint64(segmentID), widPos) m.metaFile.WriteUint64At(int64ToUint64(offset), woffsetPos) } // GetWatermarkOffset returns the watermark offset, which contains the segment ID and the offset of the segment. func (m *Metadata) GetWatermarkOffset() (segmentID, offset int64) { m.lock.RLock() defer m.lock.RUnlock() return uint64ToInt64(m.metaFile.ReadUint64At(wmidPos)), uint64ToInt64(m.metaFile.ReadUint64At(wmoffsetPos)) } // PutWatermarkOffset put the segment ID and the offset of the segment into the watermark offset. func (m *Metadata) PutWatermarkOffset(segmentID, offset int64) { m.lock.Lock() defer m.lock.Unlock() m.metaFile.WriteUint64At(int64ToUint64(segmentID), wmidPos) m.metaFile.WriteUint64At(int64ToUint64(offset), wmoffsetPos) } // GetCommittedOffset returns the committed offset, which contains the segment ID and the offset of the segment. func (m *Metadata) GetCommittedOffset() (segmentID, offset int64) { m.lock.RLock() defer m.lock.RUnlock() return uint64ToInt64(m.metaFile.ReadUint64At(cidPos)), uint64ToInt64(m.metaFile.ReadUint64At(coffsetPos)) } // PutCommittedOffset put the segment ID and the offset of the segment into the committed offset. func (m *Metadata) PutCommittedOffset(segmentID, offset int64) { m.lock.Lock() defer m.lock.Unlock() m.metaFile.WriteUint64At(int64ToUint64(segmentID), cidPos) m.metaFile.WriteUint64At(int64ToUint64(offset), coffsetPos) } // GetReadingOffset returns the reading offset, which contains the segment ID and the offset of the segment. func (m *Metadata) GetReadingOffset() (segmentID, offset int64) { m.lock.RLock() defer m.lock.RUnlock() return uint64ToInt64(m.metaFile.ReadUint64At(ridPos)), uint64ToInt64(m.metaFile.ReadUint64At(roffsetPos)) } // PutReadingOffset put the segment ID and the offset of the segment into the reading offset. func (m *Metadata) PutReadingOffset(segmentID, offset int64) { m.lock.Lock() defer m.lock.Unlock() m.metaFile.WriteUint64At(int64ToUint64(segmentID), ridPos) m.metaFile.WriteUint64At(int64ToUint64(offset), roffsetPos) } // GetCapacity returns the capacity of the queue. func (m *Metadata) GetCapacity() int { m.lock.RLock() defer m.lock.RUnlock() return uint64ToInt(m.metaFile.ReadUint64At(capacityPos)) } // PutCapacity put the capacity into the memory mapped file. func (m *Metadata) PutCapacity(version int64) { m.lock.Lock() defer m.lock.Unlock() m.metaFile.WriteUint64At(int64ToUint64(version), capacityPos) } // Flush the memory mapped file to the disk. func (m *Metadata) Flush() error { m.lock.Lock() defer m.lock.Unlock() return m.metaFile.Flush(syscall.MS_SYNC) } // Close do Flush operation and unmap the memory mapped file. func (m *Metadata) Close() error { m.lock.Lock() defer m.lock.Unlock() if err := m.metaFile.Flush(syscall.MS_SYNC); err != nil { return err } return m.metaFile.Unmap() } func uint64ToInt(value uint64) int { if value > math.MaxInt { return 0 } return int(value) } func int64ToUint64(value int64) uint64 { if value < 0 { return 0 } return uint64(value) } func uint64ToInt64(value uint64) int64 { if value > math.MaxInt64 { return 0 } return int64(value) }