lib/store/base/buffer_readwriter.go (91 lines of code) (raw):
// Copyright (c) 2016-2025 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package base
import (
"fmt"
"io"
"github.com/aws/aws-sdk-go/aws"
)
var _ FileReadWriter = &BufferReadWriter{}
// BufferReadWriter implements FileReadWriter interface for in-memory buffering.
type BufferReadWriter struct {
buf *aws.WriteAtBuffer
offset int64
}
// NewBufferReadWriter creates a new BufferReadWriter with an initial capacity of size bytes.
func NewBufferReadWriter(size uint64) *BufferReadWriter {
bytesSlice := make([]byte, 0, size)
buf := aws.NewWriteAtBuffer(bytesSlice)
// Although this is default, this is explicitly set to notify that we are reserving
// only as much capacity as needed
buf.GrowthCoeff = 1
return &BufferReadWriter{
buf: buf,
offset: 0,
}
}
// Write implements io.Writer by using WriteAt with current write offset.
func (b *BufferReadWriter) Write(p []byte) (n int, err error) {
n, err = b.buf.WriteAt(p, b.offset)
b.offset += int64(n)
return n, err
}
// WriteAt implements io.WriterAt for parallel writes.
func (b *BufferReadWriter) WriteAt(p []byte, off int64) (n int, err error) {
if off < 0 {
return 0, fmt.Errorf("negative offset")
}
return b.buf.WriteAt(p, off)
}
// Read implements io.Reader for sequential reads.
func (b *BufferReadWriter) Read(p []byte) (n int, err error) {
bufBytes := b.buf.Bytes()
if b.offset >= int64(len(bufBytes)) {
return 0, io.EOF
}
n = copy(p, bufBytes[b.offset:])
b.offset += int64(n)
if n < len(p) {
err = io.EOF
}
return n, err
}
// ReadAt implements io.ReaderAt.
func (b *BufferReadWriter) ReadAt(p []byte, off int64) (n int, err error) {
if off < 0 {
return 0, fmt.Errorf("negative offset")
}
bufBytes := b.buf.Bytes()
if off >= int64(len(bufBytes)) {
return 0, io.EOF
}
n = copy(p, bufBytes[off:])
if n < len(p) {
err = io.EOF
}
return n, err
}
// Seek implements io.Seeker.
func (b *BufferReadWriter) Seek(offset int64, whence int) (int64, error) {
var newOffset int64
bufSize := int64(len(b.buf.Bytes()))
switch whence {
case io.SeekStart:
newOffset = offset
case io.SeekCurrent:
newOffset = b.offset + offset
case io.SeekEnd:
newOffset = bufSize + offset
default:
return 0, fmt.Errorf("invalid whence: %d", whence)
}
if newOffset < 0 {
return 0, fmt.Errorf("negative position: %d", newOffset)
}
b.offset = newOffset
return newOffset, nil
}
// Close is no-op
func (b *BufferReadWriter) Close() error {
return nil
}
// Size returns the size of the buffer
func (b *BufferReadWriter) Size() int64 {
return int64(len(b.buf.Bytes()))
}
// Cancel is no-op
func (b *BufferReadWriter) Cancel() error {
return nil
}
// Commit is no-op
func (b *BufferReadWriter) Commit() error {
return nil
}
// Bytes returns the full buffer
func (b *BufferReadWriter) Bytes() []byte {
return b.buf.Bytes()
}