pkg/files/store/store.go (122 lines of code) (raw):
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
package store
import (
"context"
"os"
"strconv"
"strings"
"time"
"github.com/azure/peerd/pkg/cache"
pcontext "github.com/azure/peerd/pkg/context"
"github.com/azure/peerd/pkg/discovery/content/reader"
"github.com/azure/peerd/pkg/discovery/routing"
"github.com/azure/peerd/pkg/files"
"github.com/azure/peerd/pkg/metrics"
"github.com/azure/peerd/pkg/urlparser"
"github.com/opencontainers/go-digest"
"github.com/rs/zerolog"
)
const DefaultFileCachePath = "/tmp/distribution/peerd/cache"
// NewFilesStore creates a new store.
func NewFilesStore(ctx context.Context, r routing.Router, fileCachePath string) (FilesStore, error) {
fs := &store{
metricsRecorder: metrics.FromContext(ctx),
cache: cache.NewCache(ctx, int64(files.CacheBlockSize), fileCachePath),
prefetchChan: make(chan prefetchableSegment, PrefetchWorkers),
prefetchable: PrefetchWorkers > 0,
router: r,
resolveRetries: ResolveRetries,
resolveTimeout: ResolveTimeout,
blobsChan: make(chan string, 1000),
parser: urlparser.New(),
}
go func() {
<-ctx.Done()
err := r.Close()
l := zerolog.Ctx(ctx).Debug()
if err != nil {
l = zerolog.Ctx(ctx).Error().Err(err)
}
l.Msg("router close")
}()
for i := 0; i < PrefetchWorkers; i++ {
go fs.prefetch()
}
return fs, nil
}
// prefetchableSegment describes a part of a file to prefetch.
type prefetchableSegment struct {
name string
offset int64
count int
reader reader.Reader
}
// store describes a content store whose contents can come from disk or a remote source.
type store struct {
metricsRecorder metrics.Metrics
cache cache.Cache
prefetchable bool
prefetchChan chan prefetchableSegment
router routing.Router
resolveRetries int
resolveTimeout time.Duration
blobsChan chan string
parser urlparser.Parser
}
var _ FilesStore = &store{}
// Subscribe returns a channel that will be notified when a blob is added to the store.
func (s *store) Subscribe() chan string {
return s.blobsChan
}
// Open opens the requested file and starts prefetching it.
func (s *store) Open(c pcontext.Context) (File, error) {
chunkKey := c.GetString(pcontext.FileChunkCtxKey)
tokens := strings.Split(chunkKey, files.FileChunkKeySep)
name := tokens[0]
alignedOff, _ := strconv.ParseInt(tokens[1], 10, 64)
log := pcontext.Logger(c)
if pcontext.IsRequestFromAPeer(c) {
// This request came from a peer. Don't serve it unless we have the requested range cached.
if ok := s.cache.Exists(name, alignedOff); !ok {
log.Info().Str("name", name).Msg("peer request not cached")
return nil, os.ErrNotExist
}
}
f := &file{
Name: name,
store: s,
cur: 0,
size: 0,
reader: reader.NewReader(c, s.router, s.resolveRetries, s.resolveTimeout, s.metricsRecorder),
}
if pcontext.IsRequestFromAPeer(c) {
// Ensure this file can only serve the requested chunk.
// This is to prevent infinite loops when a peer requests a file that is not cached.
f.chunkOffset = alignedOff
}
fileSize, err := f.Fstat() // Fstat sets up the file size appropriately.
if s.prefetchable {
f.prefetch(0, fileSize)
}
return f, err
}
// Key tries to find the cache key for the requested content or returns empty.
func (s *store) Key(c pcontext.Context) (string, digest.Digest, error) {
log := pcontext.Logger(c)
blobUrl := pcontext.BlobUrl(c)
d, err := s.parser.ParseDigest(blobUrl)
if err != nil {
log.Error().Err(err).Msg("store key")
}
startIndex := int64(0) // Default to 0 for HEADs.
if c.Request.Method == "GET" {
startIndex, err = pcontext.RangeStartIndex(c.Request.Header.Get("Range"))
if err != nil {
return "", "", err
}
}
key := files.FileChunkKey(d.String(), startIndex, int64(files.CacheBlockSize))
log.Info().Str("digest", d.String()).Str("key", key).Msg("store key")
return key, d, err
}
// prefetch prefetches files.
func (s *store) prefetch() {
for p := range s.prefetchChan {
if _, err := s.cache.GetOrCreate(p.name, p.offset, p.count, func() ([]byte, error) {
return files.FetchFile(p.reader, p.name, p.offset, p.count)
}); err != nil {
p.reader.Log().Error().Err(err).Str("name", p.name).Msg("prefetch failed")
} else {
// Advertise the chunk.
s.blobsChan <- files.FileChunkKey(p.name, p.offset, int64(files.CacheBlockSize))
}
}
}