pkg/files/store/file.go (107 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package store
import (
"fmt"
"io"
"sync"
"github.com/azure/peerd/pkg/discovery/content/reader"
"github.com/azure/peerd/pkg/files"
"github.com/azure/peerd/pkg/math"
)
var errOnlySingleChunkAvailable = fmt.Errorf("only single chunk available")
// file describes a file that can be read from this content store.
// It implements the File interface. It is similar to os.File.
type file struct {
Name string
cur int64
size int64
statLock sync.Mutex
chunkOffset int64
reader reader.Reader
store *store
}
var _ File = &file{}
// prefetch tries to prefetch the specified parts of the file in chunks of cacheBlockSize.
// It can silently fail.
func (f *file) prefetch(offset int64, count int64) {
go func() {
fileSize, err := f.Fstat()
if err != nil {
return
}
segs, err := math.NewSegments(offset, files.CacheBlockSize, count, fileSize)
if err != nil {
f.reader.Log().Error().Err(err).Msg("prefetch error: failed to create segments")
return
}
for seg := range segs.All() {
f.store.prefetchChan <- prefetchableSegment{
name: f.Name,
reader: f.reader,
offset: seg.Index,
count: seg.Count,
}
}
}()
}
// Seek sets the current file offset.
func (f *file) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekCurrent:
f.cur += offset
case io.SeekStart:
f.cur = offset
case io.SeekEnd:
f.cur = f.size
}
return f.cur, nil
}
// Fstat returns the size of the file.
func (f *file) Fstat() (int64, error) {
var hit bool
f.size, hit = f.store.cache.Size(f.Name)
if !hit {
f.reader.Log().Debug().Str("name", f.Name).Int64("size", f.size).Msg("fstat getlen cache miss_1")
f.statLock.Lock()
f.size, hit = f.store.cache.Size(f.Name)
if !hit {
f.reader.Log().Debug().Str("name", f.Name).Int64("size", f.size).Msg("fstat getlen cache miss_2")
var err error
f.size, err = f.reader.FstatRemote()
if err != nil {
f.reader.Log().Error().Err(err).Msg("fstat error")
return 0, err
}
f.store.cache.PutSize(f.Name, f.size)
f.reader.Log().Debug().Str("name", f.Name).Int64("size", f.size).Msg("fstat putlen")
}
f.statLock.Unlock()
}
return f.size, nil
}
// Read reads up to len(p) bytes into p. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
func (f *file) Read(p []byte) (n int, err error) {
ret, err := f.ReadAt(p, f.cur)
if err == nil {
f.cur += int64(ret)
}
return ret, err
}
// ReadAt reads len(p) bytes from the File starting at byte offset off. It returns the number of bytes read and the error, if any.
func (f *file) ReadAt(buff []byte, offset int64) (int, error) {
fileSize, err := f.Fstat()
if err != nil {
return 0, err
}
alignedOffset := math.AlignDown(offset, int64(files.CacheBlockSize))
if f.chunkOffset != 0 && alignedOffset != f.chunkOffset {
f.reader.Log().Error().Err(errOnlySingleChunkAvailable).Int64("chunk", f.chunkOffset).Int64("alignedOffset", alignedOffset).Int64("requestedOffset", offset).Msg("file can only read chunk")
return -1, errOnlySingleChunkAvailable
}
count := int(math.Min64(int64(files.CacheBlockSize), fileSize-alignedOffset))
data, err := f.store.cache.GetOrCreate(f.Name, alignedOffset, count, func() ([]byte, error) {
return files.FetchFile(f.reader, f.Name, alignedOffset, count)
})
if err != nil {
f.reader.Log().Error().Err(err).Msg("readat error")
return 0, fmt.Errorf("failed to ReadAt, path: %v, offset: %v, error: %v", f.Name, offset, err.Error())
}
pos := int(offset - alignedOffset)
ret := math.Min(len(buff), len(data)-pos)
ret = copy(buff[:ret], data[pos:pos+ret])
if offset+int64(len(buff)) > fileSize {
err = io.EOF
}
return ret, err
}