lib/dockerregistry/transfer/rw_transferer.go (181 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"context"
"fmt"
"os"
"github.com/uber/kraken/build-index/tagclient"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/origin/blobclient"
"github.com/uber/kraken/utils/closers"
"github.com/uber/kraken/utils/log"
"github.com/docker/distribution/uuid"
"github.com/uber-go/tally"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)
// ReadWriteTransferer is a Transferer for proxy. Uploads/downloads blobs via the
// local origin cluster, and posts/gets tags via the local build-index.
type ReadWriteTransferer struct {
stats tally.Scope
successStats tally.Scope
failureStats tally.Scope
tags tagclient.Client
originCluster blobclient.ClusterClient
cas *store.CAStore
tracer trace.Tracer
}
// NewReadWriteTransferer creates a new ReadWriteTransferer.
func NewReadWriteTransferer(
stats tally.Scope,
tags tagclient.Client,
originCluster blobclient.ClusterClient,
cas *store.CAStore,
) *ReadWriteTransferer {
stats = stats.Tagged(map[string]string{
"module": "rwtransferer",
})
return &ReadWriteTransferer{
stats: stats,
successStats: stats.Tagged(map[string]string{"result": "success"}),
failureStats: stats.Tagged(map[string]string{"result": "failure"}),
tags: tags,
originCluster: originCluster,
cas: cas,
tracer: otel.Tracer("kraken-registry-transfer"),
}
}
// Stat returns blob info from origin cluster or local cache.
func (t *ReadWriteTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) {
fi, err := t.cas.GetCacheFileStat(d.Hex())
if err == nil {
return core.NewBlobInfo(fi.Size()), nil
}
if os.IsNotExist(err) {
return t.originStat(namespace, d)
}
return nil, fmt.Errorf("stat cache file: %s", err)
}
func (t *ReadWriteTransferer) originStat(namespace string, d core.Digest) (*core.BlobInfo, error) {
bi, err := t.originCluster.Stat(namespace, d)
if err == nil {
return bi, nil
}
// `docker push` stats blobs before uploading them. If the blob is not
// found, it will upload it. However if remote blob storage is unavailable,
// this will be a 5XX error, and will short-circuit push. We must consider
// this class of error to be a 404 to allow pushes to succeed while remote
// storage is down (write-back will eventually persist the blobs).
if err != blobclient.ErrBlobNotFound {
log.With("digest", d).Info("Error stat-ing origin blob: %s", err)
}
return nil, ErrBlobNotFound
}
// Download downloads the blob of name into the file store and returns a reader
// to the newly downloaded file.
func (t *ReadWriteTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) {
t.stats.Counter("download_requests").Inc(1)
ctx, span := t.tracer.Start(context.Background(), "registry.download_blob",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
attribute.String("component", "registry-transfer"),
attribute.String("operation", "download_blob"),
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
),
)
defer span.End()
blob, err := t.cas.GetCacheFileReader(d.Hex())
if err == nil {
span.SetAttributes(attribute.String("cache.status", "hit"))
span.SetStatus(codes.Ok, "cache hit")
return blob, nil
}
if os.IsNotExist(err) {
span.SetAttributes(attribute.String("cache.status", "miss"))
return t.downloadFromOrigin(ctx, namespace, d)
}
span.RecordError(err)
span.SetStatus(codes.Error, "cache read error")
return nil, fmt.Errorf("get cache file: %s", err)
}
func (t *ReadWriteTransferer) downloadFromOrigin(ctx context.Context, namespace string, d core.Digest) (store.FileReader, error) {
ctx, span := t.tracer.Start(ctx, "registry.download_from_origin",
trace.WithAttributes(
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
),
)
defer span.End()
tmp := fmt.Sprintf("%s.%s", d.Hex(), uuid.Generate().String())
if err := t.cas.CreateUploadFile(tmp, 0); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to create upload file")
return nil, fmt.Errorf("create upload file: %s", err)
}
w, err := t.cas.GetUploadFileReadWriter(tmp)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to get upload writer")
return nil, fmt.Errorf("get upload writer: %s", err)
}
defer closers.Close(w)
if err := t.originCluster.DownloadBlob(ctx, namespace, d, w); err != nil {
if err == blobclient.ErrBlobNotFound {
span.SetStatus(codes.Error, "blob not found")
return nil, ErrBlobNotFound
}
span.RecordError(err)
span.SetStatus(codes.Error, "origin download failed")
return nil, fmt.Errorf("origin: %s", err)
}
if err := t.cas.MoveUploadFileToCache(tmp, d.Hex()); err != nil && !os.IsExist(err) {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to move to cache")
return nil, fmt.Errorf("move upload file to cache: %s", err)
}
blob, err := t.cas.GetCacheFileReader(d.Hex())
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, "failed to read cached blob")
return nil, fmt.Errorf("get cache file: %s", err)
}
span.SetStatus(codes.Ok, "download completed")
return blob, nil
}
// Upload uploads blob to the origin cluster.
func (t *ReadWriteTransferer) Upload(
namespace string, d core.Digest, blob store.FileReader,
) error {
t.stats.Counter("upload_requests").Inc(1)
ctx, span := t.tracer.Start(context.Background(), "registry.upload_blob",
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(
attribute.String("component", "registry-transfer"),
attribute.String("operation", "upload_blob"),
attribute.String("namespace", namespace),
attribute.String("blob.digest", d.Hex()),
),
)
defer span.End()
if err := t.originCluster.UploadBlob(ctx, namespace, d, blob); err != nil {
t.failureStats.Counter("upload_blob").Inc(1)
span.RecordError(err)
span.SetStatus(codes.Error, "upload failed")
return err
}
t.successStats.Counter("upload_blob").Inc(1)
span.SetStatus(codes.Ok, "upload completed")
return nil
}
// GetTag returns the manifest digest for tag.
func (t *ReadWriteTransferer) GetTag(tag string) (core.Digest, error) {
d, err := t.tags.Get(tag)
if err == nil {
return d, nil
}
if err == tagclient.ErrTagNotFound {
return core.Digest{}, ErrTagNotFound
}
return core.Digest{}, fmt.Errorf("client get tag: %s", err)
}
// PutTag uploads d as the manifest digest for tag.
func (t *ReadWriteTransferer) PutTag(tag string, d core.Digest) error {
if err := t.tags.PutAndReplicate(tag, d); err != nil {
t.failureStats.Counter("put_tag").Inc(1)
return fmt.Errorf("put and replicate tag: %s", err)
}
t.successStats.Counter("put_tag").Inc(1)
return nil
}
// ListTags lists all tags with prefix.
func (t *ReadWriteTransferer) ListTags(prefix string) ([]string, error) {
return t.tags.List(prefix)
}