lib/dockerregistry/transfer/ro_transferer.go (82 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package transfer
import (
"errors"
"fmt"
"os"
"github.com/uber-go/tally"
"github.com/uber/kraken/build-index/tagclient"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/torrent/scheduler"
"github.com/uber/kraken/utils/memsize"
)
var _ ImageTransferer = (*ReadOnlyTransferer)(nil)
// ReadOnlyTransferer gets and posts manifest to tracker, and transfers blobs as torrent.
type ReadOnlyTransferer struct {
stats tally.Scope
cads *store.CADownloadStore
tags tagclient.Client
sched scheduler.Scheduler
}
// NewReadOnlyTransferer creates a new ReadOnlyTransferer.
func NewReadOnlyTransferer(
stats tally.Scope,
cads *store.CADownloadStore,
tags tagclient.Client,
sched scheduler.Scheduler) *ReadOnlyTransferer {
stats = stats.Tagged(map[string]string{
"module": "rotransferer",
})
return &ReadOnlyTransferer{stats, cads, tags, sched}
}
// Stat returns blob info from local cache, and triggers download if the blob is
// not available locally.
func (t *ReadOnlyTransferer) Stat(namespace string, d core.Digest) (*core.BlobInfo, error) {
fi, err := t.cads.Cache().GetFileStat(d.Hex())
if os.IsNotExist(err) || t.cads.InDownloadError(err) {
if err := t.sched.Download(namespace, d); err != nil {
return nil, fmt.Errorf("scheduler: %s", err)
}
fi, err = t.cads.Cache().GetFileStat(d.Hex())
if err != nil {
return nil, fmt.Errorf("stat cache: %s", err)
}
} else if err != nil {
return nil, fmt.Errorf("stat cache: %s", err)
}
return core.NewBlobInfo(fi.Size()), nil
}
// Download downloads blobs as torrent.
func (t *ReadOnlyTransferer) Download(namespace string, d core.Digest) (store.FileReader, error) {
f, err := t.cads.Cache().GetFileReader(d.Hex())
if os.IsNotExist(err) || t.cads.InDownloadError(err) {
if err := t.sched.Download(namespace, d); err != nil {
return nil, fmt.Errorf("scheduler: %s", err)
}
f, err = t.cads.Cache().GetFileReader(d.Hex())
if err != nil {
return nil, fmt.Errorf("cache: %s", err)
}
} else if err != nil {
return nil, fmt.Errorf("cache: %s", err)
}
mbServed := int64(uint64(f.Size()) / memsize.MB)
t.stats.Counter("mb_served").Inc(mbServed)
return f, nil
}
// Upload uploads blobs to a torrent network.
func (t *ReadOnlyTransferer) Upload(namespace string, d core.Digest, blob store.FileReader) error {
return errors.New("unsupported operation")
}
// GetTag gets manifest digest for tag.
func (t *ReadOnlyTransferer) GetTag(tag string) (core.Digest, error) {
d, err := t.tags.Get(tag)
if err != nil {
if err == tagclient.ErrTagNotFound {
t.stats.Counter("tag_not_found").Inc(1)
return core.Digest{}, ErrTagNotFound
}
t.stats.Counter("get_tag_error").Inc(1)
return core.Digest{}, fmt.Errorf("client get tag: %s", err)
}
return d, nil
}
// PutTag is not supported.
func (t *ReadOnlyTransferer) PutTag(tag string, d core.Digest) error {
return errors.New("not supported")
}
// ListTags is not supported.
func (t *ReadOnlyTransferer) ListTags(prefix string) ([]string, error) {
return nil, errors.New("not supported")
}