lib/torrent/storage/agentstorage/torrent_archive.go (93 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package agentstorage
import (
"fmt"
"os"
"time"
"github.com/uber-go/tally"
"github.com/willf/bitset"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/store"
"github.com/uber/kraken/lib/store/metadata"
"github.com/uber/kraken/lib/torrent/observability"
"github.com/uber/kraken/lib/torrent/storage"
"github.com/uber/kraken/tracker/metainfoclient"
)
// TorrentArchive is capable of initializing torrents in the download directory
// and serving torrents from either the download or cache directory.
type TorrentArchive struct {
stats tally.Scope
cads *store.CADownloadStore
metaInfoClient metainfoclient.Client
}
// NewTorrentArchive creates a new TorrentArchive.
func NewTorrentArchive(
stats tally.Scope,
cads *store.CADownloadStore,
mic metainfoclient.Client) *TorrentArchive {
stats = stats.Tagged(map[string]string{
"module": "agenttorrentarchive",
})
return &TorrentArchive{stats, cads, mic}
}
// Stat returns TorrentInfo for the given digest. Returns os.ErrNotExist if the
// file does not exist. Ignores namespace.
func (a *TorrentArchive) Stat(namespace string, d core.Digest) (*storage.TorrentInfo, error) {
var tm metadata.TorrentMeta
if err := a.cads.Any().GetMetadata(d.Hex(), &tm); err != nil {
return nil, err
}
var psm pieceStatusMetadata
if err := a.cads.Any().GetMetadata(d.Hex(), &psm); err != nil {
return nil, err
}
b := bitset.New(uint(len(psm.pieces)))
for i, p := range psm.pieces {
if p.status == _complete {
b.Set(uint(i))
}
}
return storage.NewTorrentInfo(tm.MetaInfo, b), nil
}
// CreateTorrent returns a Torrent for either an existing metainfo / file on
// disk, or downloads metainfo and initializes the file. Returns ErrNotFound
// if no metainfo was found.
func (a *TorrentArchive) CreateTorrent(namespace string, d core.Digest) (storage.Torrent, error) {
var tm metadata.TorrentMeta
if err := a.cads.Any().GetMetadata(d.Hex(), &tm); os.IsNotExist(err) {
startTime := time.Now()
mi, err := a.metaInfoClient.Download(namespace, d)
if err != nil {
if err == metainfoclient.ErrNotFound {
return nil, storage.ErrNotFound
}
return nil, fmt.Errorf("download metainfo: %s", err)
}
metainfoDownloadLatency := time.Since(startTime)
observability.EmitDownloadPerformance(a.stats, observability.METAINFO_DOWNLOAD, mi.Length(), metainfoDownloadLatency)
// There's a race condition here, but it's "okay"... Basically, we could
// initialize a download file with metainfo that is rejected by file store,
// because someone else beats us to it. However, we catch a lucky break
// because the only piece of metainfo we use is file length -- which digest
// is derived from, so it's "okay".
createErr := a.cads.CreateDownloadFile(mi.Digest().Hex(), mi.Length())
if createErr != nil &&
!a.cads.InDownloadError(createErr) && !a.cads.InCacheError(createErr) {
return nil, fmt.Errorf("create download file: %s", createErr)
}
tm.MetaInfo = mi
if err := a.cads.Any().GetOrSetMetadata(d.Hex(), &tm); err != nil {
return nil, fmt.Errorf("get or set metainfo: %s", err)
}
} else if err != nil {
return nil, fmt.Errorf("get metainfo: %s", err)
}
t, err := NewTorrent(a.cads, tm.MetaInfo)
if err != nil {
return nil, fmt.Errorf("initialize torrent: %s", err)
}
return t, nil
}
// GetTorrent returns a Torrent for an existing metainfo / file on disk. Ignores namespace.
func (a *TorrentArchive) GetTorrent(namespace string, d core.Digest) (storage.Torrent, error) {
var tm metadata.TorrentMeta
if err := a.cads.Any().GetMetadata(d.Hex(), &tm); err != nil {
return nil, fmt.Errorf("get metainfo: %s", err)
}
t, err := NewTorrent(a.cads, tm.MetaInfo)
if err != nil {
return nil, fmt.Errorf("initialize torrent: %s", err)
}
return t, nil
}
// DeleteTorrent deletes a torrent from disk.
func (a *TorrentArchive) DeleteTorrent(d core.Digest) error {
if err := a.cads.Any().DeleteFile(d.Hex()); err != nil && !os.IsNotExist(err) {
return err
}
return nil
}