internal/ringbuffer/buffer.go (95 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package ringbuffer import ( "bytes" "encoding/binary" "io" "io/ioutil" ) // BlockHeaderSize is the size of the block header, in bytes. const BlockHeaderSize = 5 // BlockTag is a block tag, which can be used for classification. type BlockTag uint8 // BlockHeader holds a fixed-size block header. type BlockHeader struct { // Tag is the block's tag. Tag BlockTag // Size is the size of the block data, in bytes. Size uint32 } // Buffer is a ring buffer of byte blocks. type Buffer struct { buf []byte headerbuf [BlockHeaderSize]byte len int write int read int // Evicted will be called when an old block is evicted to make place for a new one. Evicted func(BlockHeader) } // New returns a new Buffer with the given size in bytes. func New(size int) *Buffer { return &Buffer{ buf: make([]byte, size), Evicted: func(BlockHeader) {}, } } // Len returns the number of bytes currently in the buffer, including // block-accounting bytes. func (b *Buffer) Len() int { return b.len } // Cap returns the capacity of the buffer. func (b *Buffer) Cap() int { return len(b.buf) } // WriteBlockTo writes the oldest block in b to w, returning the block header and the number of bytes written to w. func (b *Buffer) WriteBlockTo(w io.Writer) (header BlockHeader, written int64, err error) { if b.len == 0 { return header, 0, io.EOF } if n := copy(b.headerbuf[:], b.buf[b.read:]); n < len(b.headerbuf) { b.read = copy(b.headerbuf[n:], b.buf[:]) } else { b.read = (b.read + n) % b.Cap() } b.len -= len(b.headerbuf) header.Tag = BlockTag(b.headerbuf[0]) header.Size = binary.LittleEndian.Uint32(b.headerbuf[1:]) size := int(header.Size) if b.read+size > b.Cap() { tail := b.buf[b.read:] n, err := w.Write(tail) if err != nil { b.read = (b.read + size) % b.Cap() b.len -= size + len(b.headerbuf) return header, int64(n), err } size -= n written = int64(n) b.read = 0 b.len -= n } n, err := w.Write(b.buf[b.read : b.read+size]) if err != nil { return header, written + int64(n), err } written += int64(n) b.read = (b.read + size) % b.Cap() b.len -= size return header, written, nil } // WriteBlock writes p as a block to b, with tag t. // // If len(p)+BlockHeaderSize > b.Cap(), bytes.ErrTooLarge will be returned. // If the buffer does not currently have room for the block, then the // oldest blocks will be evicted until enough room is available. func (b *Buffer) WriteBlock(p []byte, tag BlockTag) (int, error) { lenp := len(p) if lenp+BlockHeaderSize > b.Cap() { return 0, bytes.ErrTooLarge } for lenp+BlockHeaderSize > b.Cap()-b.Len() { header, _, err := b.WriteBlockTo(ioutil.Discard) if err != nil { return 0, err } b.Evicted(header) } b.headerbuf[0] = uint8(tag) binary.LittleEndian.PutUint32(b.headerbuf[1:], uint32(lenp)) if n := copy(b.buf[b.write:], b.headerbuf[:]); n < len(b.headerbuf) { b.write = copy(b.buf, b.headerbuf[n:]) } else { b.write = (b.write + n) % b.Cap() } if n := copy(b.buf[b.write:], p); n < lenp { b.write = copy(b.buf, p[n:]) } else { b.write = (b.write + n) % b.Cap() } b.len += lenp + BlockHeaderSize return lenp, nil }