lib/blobrefresh/refresher.go (112 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package blobrefresh
import (
"errors"
"fmt"
"strings"
"time"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/backend"
"github.com/uber/kraken/lib/backend/backenderrors"
"github.com/uber/kraken/lib/metainfogen"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/utils/dedup"
"github.com/uber/kraken/utils/log"
"github.com/andres-erbsen/clock"
"github.com/c2h5oh/datasize"
"github.com/uber-go/tally"
)
// Refresher errors.
var (
ErrPending = errors.New("download is pending")
ErrNotFound = errors.New("blob not found")
ErrWorkersBusy = errors.New("no workers available")
)
// PostHook runs after the blob has been downloaded within the context of the
// deduplicated request.
type PostHook interface {
Run(d core.Digest)
}
// Refresher deduplicates blob downloads / metainfo generation. Refresher is not
// responsible for tracking whether blobs already exist on disk -- it only provides
// a method for downloading blobs in a deduplicated fashion.
type Refresher struct {
config Config
stats tally.Scope
requests *dedup.RequestCache
cas *store.CAStore
backends *backend.Manager
metaInfoGenerator *metainfogen.Generator
}
// New creates a new Refresher.
func New(
config Config,
stats tally.Scope,
cas *store.CAStore,
backends *backend.Manager,
metaInfoGenerator *metainfogen.Generator) *Refresher {
stats = stats.Tagged(map[string]string{
"module": "blobrefresh",
})
requestsStats := stats.Tagged(map[string]string{
"request_type": "blobrefresh",
})
requests := dedup.NewRequestCache(dedup.RequestCacheConfig{}, clock.New(), requestsStats)
requests.SetNotFound(func(err error) bool { return err == backenderrors.ErrBlobNotFound })
return &Refresher{config, stats, requests, cas, backends, metaInfoGenerator}
}
// Refresh kicks off a background goroutine to download the blob for d from the
// remote backend configured for namespace and generates metainfo for the blob.
// Returns ErrPending if an existing download for the blob is already running.
// Returns ErrNotFound if the blob is not found. Returns ErrWorkersBusy if no
// goroutines are available to run the download.
func (r *Refresher) Refresh(namespace string, d core.Digest, hooks ...PostHook) error {
client, err := r.backends.GetClient(namespace)
if err != nil {
return fmt.Errorf("backend manager: %s", err)
}
// Always check whether the blob is actually available and valid before
// returning a potential pending error. This ensures that the majority of
// errors are propogated quickly and syncronously.
info, err := client.Stat(namespace, d.Hex())
if err != nil {
if err == backenderrors.ErrBlobNotFound {
return ErrNotFound
}
return fmt.Errorf("stat: %s", err)
}
size := datasize.ByteSize(info.Size)
if r.config.SizeLimit > 0 && size > r.config.SizeLimit {
return fmt.Errorf("%s blob exceeds size limit of %s", size, r.config.SizeLimit)
}
id := namespace + ":" + d.Hex()
err = r.requests.Start(id, func() error {
start := time.Now()
pieceLength := r.metaInfoGenerator.GetPieceLength(int64(size))
err := r.download(client, namespace, d, size.Bytes(), pieceLength)
if err != nil {
return err
}
t := time.Since(start)
stats := r.stats.Tagged(map[string]string{"namespace": extractPrefix(namespace)})
stats.Timer("download_remote_blob").Record(t)
stats.Counter("downloads").Inc(1)
log.With(
"namespace", namespace,
"name", d.Hex(),
"download_time", t).Info("Downloaded remote blob")
for _, h := range hooks {
h.Run(d)
}
return nil
})
switch err {
case dedup.ErrRequestPending:
return ErrPending
case backenderrors.ErrBlobNotFound:
return ErrNotFound
case dedup.ErrWorkersBusy:
return ErrWorkersBusy
default:
return err
}
}
func (r *Refresher) download(client backend.Client, namespace string, d core.Digest, size uint64, pieceLength int64) error {
name := d.Hex()
return r.cas.WriteBlobToCacheWithMetaInfo(name, size, func(w store.FileReadWriter) error {
return client.Download(namespace, name, w)
}, pieceLength)
}
// extractPrefix extracts the prefix from the namespace to decrease the cardinality of metrics.
func extractPrefix(namespace string) string {
if prefix, _, ok := strings.Cut(namespace, "/"); ok {
return prefix
}
if prefix, _, ok := strings.Cut(namespace, "_"); ok {
return prefix
}
return namespace
}