pkg/cached/cached.go (97 lines of code) (raw):
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package cached
import (
"context"
"io"
"sync"
"github.com/googlecloudplatform/pi-delivery/pkg/resultset"
)
const cacheSize = 1 * 1024 * 1024 // 1 MiB
var radixes = map[int]int{10: 0, 16: 1}
type cache struct {
once sync.Once
lock sync.RWMutex
cache []byte
}
var _cache = make([]cache, len(radixes))
// UpstreamReader is the reader CachedReader reads from.
type UpstreamReader interface {
io.ReadSeeker
io.ReaderAt
// ResultSet returns the upstream result set.
ResultSet() resultset.ResultSet
}
// CachedReader provides a cache support for up to the first cacheSize bytes
// on top of the UpstreamReader.
type CachedReader struct {
off int64
rd UpstreamReader
ctx context.Context
cache *cache
}
var _ io.ReadSeeker = new(CachedReader)
var _ io.ReaderAt = new(CachedReader)
// NewCachedReader returns a new CachedReader for upstream rd.
func NewCachedReader(ctx context.Context, rd UpstreamReader) *CachedReader {
cache := &_cache[radixes[rd.ResultSet().Radix()]]
cache.once.Do(func() {
cache.cache = make([]byte, 0, cacheSize)
})
return &CachedReader{
ctx: ctx,
rd: rd,
off: 0,
cache: cache,
}
}
// ReadAt reads len(p) bytes of packed results from offset off.
func (r *CachedReader) ReadAt(p []byte, off int64) (int, error) {
n := 0
if read, ok := r.readCache(p, off); ok {
n += read
if n == len(p) {
return n, nil
}
}
read, err := r.rd.ReadAt(p[n:], off+int64(n))
r.updateCache(p[n:n+read], off+int64(n))
return n + read, err
}
// Read reads len(p) bytes of packed results from the current offset.
func (r *CachedReader) Read(p []byte) (int, error) {
if n, ok := r.readCache(p, r.off); ok {
r.Seek(int64(n), io.SeekCurrent)
return n, nil
}
n, err := r.rd.Read(p)
r.updateCache(p[:n], r.off)
r.off += int64(n)
return n, err
}
// Seek updates the offset for the next Read.
func (r *CachedReader) Seek(offset int64, whence int) (int64, error) {
off, err := r.rd.Seek(offset, whence)
r.off = off
return off, err
}
func (r *CachedReader) readCache(p []byte, offset int64) (int, bool) {
r.cache.lock.RLock()
defer r.cache.lock.RUnlock()
if int64(len(r.cache.cache)) <= offset {
return 0, false
}
return copy(p, r.cache.cache[offset:]), true
}
func (r *CachedReader) updateCache(p []byte, offset int64) {
// This is lazy so just update if the data is contiguous.
// Check boundaries with a read lock first.
r.cache.lock.RLock()
if int64(len(r.cache.cache)) < offset ||
int64(cap(r.cache.cache)) <= offset {
r.cache.lock.RUnlock()
return
}
r.cache.lock.RUnlock()
r.cache.lock.Lock()
defer r.cache.lock.Unlock()
overlap := len(r.cache.cache) - int(offset)
n := len(p) - overlap
if overlap >= 0 && n > 0 {
if len(r.cache.cache)+n > cap(r.cache.cache) {
n = cap(r.cache.cache) - len(r.cache.cache)
}
r.cache.cache = append(r.cache.cache, p[overlap:overlap+n]...)
}
}
// ResultSet returns the upstream ResultSet.
func (r *CachedReader) ResultSet() resultset.ResultSet {
return r.rd.ResultSet()
}