lib/torrent/storage/agentstorage/torrent.go (211 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package agentstorage import ( "errors" "fmt" "io" "os" "github.com/uber/kraken/core" "github.com/uber/kraken/lib/store" "github.com/uber/kraken/lib/torrent/storage" "github.com/uber/kraken/lib/torrent/storage/piecereader" "github.com/uber/kraken/utils/log" "github.com/willf/bitset" "go.uber.org/atomic" ) var ( errPieceNotComplete = errors.New("piece not complete") errWritePieceConflict = errors.New("piece is already being written to") ) // caDownloadStore defines the CADownloadStore methods which Torrent requires. Useful // for testing purposes, where we need to mock certain methods. type caDownloadStore interface { MoveDownloadFileToCache(name string) error GetDownloadFileReadWriter(name string) (store.FileReadWriter, error) Any() *store.CADownloadStoreScope Download() *store.CADownloadStoreScope InCacheError(error) bool } // Torrent implements a Torrent on top of an AgentFileStore. // It Allows concurrent writes on distinct pieces, and concurrent reads on all // pieces. Behavior is undefined if multiple Torrent instances are backed // by the same file store and metainfo. type Torrent struct { metaInfo *core.MetaInfo cads caDownloadStore pieces []*piece numComplete *atomic.Int32 committed *atomic.Bool } // NewTorrent creates a new Torrent. func NewTorrent(cads caDownloadStore, mi *core.MetaInfo) (*Torrent, error) { pieces, numComplete, err := restorePieces(mi.Digest(), cads, mi.NumPieces()) if err != nil { return nil, fmt.Errorf("restore pieces: %s", err) } committed := false if numComplete == len(pieces) { if err := cads.MoveDownloadFileToCache(mi.Digest().Hex()); err != nil && !os.IsExist(err) { return nil, fmt.Errorf("move file to cache: %s", err) } committed = true } return &Torrent{ cads: cads, metaInfo: mi, pieces: pieces, numComplete: atomic.NewInt32(int32(numComplete)), committed: atomic.NewBool(committed), }, nil } // Digest returns the digest of the target blob. func (t *Torrent) Digest() core.Digest { return t.metaInfo.Digest() } // Stat returns the storage.TorrentInfo for t. func (t *Torrent) Stat() *storage.TorrentInfo { return storage.NewTorrentInfo(t.metaInfo, t.Bitfield()) } // InfoHash returns the torrent metainfo hash. func (t *Torrent) InfoHash() core.InfoHash { return t.metaInfo.InfoHash() } // NumPieces returns the number of pieces in the torrent. func (t *Torrent) NumPieces() int { return len(t.pieces) } // Length returns the length of the target file. func (t *Torrent) Length() int64 { return t.metaInfo.Length() } // PieceLength returns the length of piece pi. func (t *Torrent) PieceLength(pi int) int64 { return t.metaInfo.GetPieceLength(pi) } // MaxPieceLength returns the longest piece length of the torrent. func (t *Torrent) MaxPieceLength() int64 { return t.PieceLength(0) } // Complete indicates whether the torrent is complete or not. Completeness is // defined by whether the torrent file has been committed to the cache directory. func (t *Torrent) Complete() bool { return t.committed.Load() } // BytesDownloaded returns an estimate of the number of bytes downloaded in the // torrent. func (t *Torrent) BytesDownloaded() int64 { return min(int64(t.numComplete.Load())*t.metaInfo.PieceLength(), t.metaInfo.Length()) } // Bitfield returns the bitfield of pieces where true denotes a complete piece // and false denotes an incomplete piece. func (t *Torrent) Bitfield() *bitset.BitSet { bitfield := bitset.New(uint(len(t.pieces))) for i, p := range t.pieces { if p.complete() { bitfield.Set(uint(i)) } } return bitfield } func (t *Torrent) String() string { downloaded := int(float64(t.BytesDownloaded()) / float64(t.metaInfo.Length()) * 100) return fmt.Sprintf( "torrent(name=%s, hash=%s, downloaded=%d%%)", t.Digest().Hex(), t.InfoHash().Hex(), downloaded) } func (t *Torrent) getPiece(pi int) (*piece, error) { if pi >= len(t.pieces) { return nil, fmt.Errorf("invalid piece index %d: num pieces = %d", pi, len(t.pieces)) } return t.pieces[pi], nil } // markPieceComplete must only be called once per piece. func (t *Torrent) markPieceComplete(pi int) error { updated, err := t.cads.Download().SetMetadataAt( t.Digest().Hex(), &pieceStatusMetadata{}, []byte{byte(_complete)}, int64(pi)) if err != nil { return fmt.Errorf("write piece metadata: %s", err) } if !updated { // This could mean there's another thread with a Torrent instance using // the same file as us. log.Errorf( "Invariant violation: piece marked complete twice: piece %d in %s", pi, t.Digest().Hex()) } t.pieces[pi].markComplete() t.numComplete.Inc() return nil } // writePiece writes data to piece pi. If the write succeeds, marks the piece as completed. func (t *Torrent) writePiece(src storage.PieceReader, pi int) error { f, err := t.cads.GetDownloadFileReadWriter(t.metaInfo.Digest().Hex()) if err != nil { return fmt.Errorf("get download writer: %s", err) } defer f.Close() h := core.PieceHash() r := io.TeeReader(src, h) // Calculates piece sum as we write to file. if _, err := f.Seek(t.getFileOffset(pi), 0); err != nil { return fmt.Errorf("seek: %s", err) } if _, err := io.Copy(f, r); err != nil { return fmt.Errorf("copy: %s", err) } if h.Sum32() != t.metaInfo.GetPieceSum(pi) { return errors.New("invalid piece sum") } if err := t.markPieceComplete(pi); err != nil { return fmt.Errorf("mark piece complete: %s", err) } return nil } // WritePiece writes data to piece pi. func (t *Torrent) WritePiece(src storage.PieceReader, pi int) error { piece, err := t.getPiece(pi) if err != nil { return err } if int64(src.Length()) != t.PieceLength(pi) { return fmt.Errorf( "invalid piece length: expected %d, got %d", t.PieceLength(pi), src.Length()) } // Exit quickly if the piece is not writable. if piece.complete() { return storage.ErrPieceComplete } if piece.dirty() { return errWritePieceConflict } dirty, complete := piece.tryMarkDirty() if dirty { return errWritePieceConflict } else if complete { return storage.ErrPieceComplete } // At this point, we've determined that the piece is not complete and ensured // we are the only thread which may write the piece. We do not block other // threads from checking if the piece is writable. if err := t.writePiece(src, pi); err != nil { // Allow other threads to write this piece since we mysteriously failed. piece.markEmpty() return fmt.Errorf("write piece: %s", err) } if int(t.numComplete.Load()) == len(t.pieces) { // Multiple threads may attempt to move the download file to cache, however // only one will succeed while the others will receive (and ignore) file exist // error. err := t.cads.MoveDownloadFileToCache(t.metaInfo.Digest().Hex()) if err != nil && !os.IsExist(err) { return fmt.Errorf("download completed but failed to move file to cache directory: %s", err) } t.committed.Store(true) } return nil } type opener struct { torrent *Torrent } func (o *opener) Open() (store.FileReader, error) { return o.torrent.cads.Any().GetFileReader(o.torrent.Digest().Hex()) } // GetPieceReader returns a reader for piece pi. func (t *Torrent) GetPieceReader(pi int) (storage.PieceReader, error) { piece, err := t.getPiece(pi) if err != nil { return nil, err } if !piece.complete() { return nil, errPieceNotComplete } return piecereader.NewFileReader(t.getFileOffset(pi), t.PieceLength(pi), &opener{t}), nil } // HasPiece returns if piece pi is complete. func (t *Torrent) HasPiece(pi int) bool { piece, err := t.getPiece(pi) if err != nil { return false } return piece.complete() } // MissingPieces returns the indeces of all missing pieces. func (t *Torrent) MissingPieces() []int { var missing []int for i, p := range t.pieces { if !p.complete() { missing = append(missing, i) } } return missing } // getFileOffset calculates the offset in the torrent file given piece index. // Assumes pi is a valid piece index. func (t *Torrent) getFileOffset(pi int) int64 { return t.metaInfo.PieceLength() * int64(pi) } func min(a, b int64) int64 { if a < b { return a } return b }