lib/torrent/storage/agentstorage/pieces.go (116 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package agentstorage import ( "fmt" "regexp" "sync" "github.com/uber/kraken/core" "github.com/uber/kraken/lib/store/metadata" "github.com/uber/kraken/utils/log" ) const _pieceStatusSuffix = "_status" func init() { metadata.Register(regexp.MustCompile(_pieceStatusSuffix), pieceStatusMetadataFactory{}) } type pieceStatus int const ( _empty pieceStatus = iota _complete _dirty ) type pieceStatusMetadataFactory struct{} func (m pieceStatusMetadataFactory) Create(suffix string) metadata.Metadata { return &pieceStatusMetadata{} } // pieceStatusMetadata stores pieces statuses as metadata on disk. type pieceStatusMetadata struct { pieces []*piece } func newPieceStatusMetadata(pieces []*piece) *pieceStatusMetadata { return &pieceStatusMetadata{pieces} } func (m *pieceStatusMetadata) GetSuffix() string { return _pieceStatusSuffix } func (m *pieceStatusMetadata) Movable() bool { return true } func (m *pieceStatusMetadata) Serialize() ([]byte, error) { b := make([]byte, len(m.pieces)) for i, p := range m.pieces { b[i] = byte(p.status) } return b, nil } func (m *pieceStatusMetadata) Deserialize(b []byte) error { m.pieces = make([]*piece, len(b)) for i := range b { status := pieceStatus(b[i]) if status != _empty && status != _complete { log.Errorf("Unexpected status in piece metadata: %d", status) status = _empty } m.pieces[i] = &piece{status: status} } return nil } type piece struct { sync.RWMutex status pieceStatus } func (p *piece) complete() bool { p.RLock() defer p.RUnlock() return p.status == _complete } func (p *piece) dirty() bool { p.RLock() defer p.RUnlock() return p.status == _dirty } func (p *piece) tryMarkDirty() (dirty, complete bool) { p.Lock() defer p.Unlock() switch p.status { case _empty: p.status = _dirty case _dirty: dirty = true case _complete: complete = true default: log.Fatalf("Unknown piece status: %d", p.status) } return } func (p *piece) markEmpty() { p.Lock() defer p.Unlock() p.status = _empty } func (p *piece) markComplete() { p.Lock() defer p.Unlock() p.status = _complete } // restorePieces reads piece metadata from disk and restores the in-memory piece // statuses. A naive solution would be to read the entire blob from disk and // hash the pieces to determine completion status -- however, this is very // expensive. Instead, Torrent tracks completed pieces on disk via metadata // as they are written. func restorePieces( d core.Digest, cads caDownloadStore, numPieces int) (pieces []*piece, numComplete int, err error) { for i := 0; i < numPieces; i++ { pieces = append(pieces, &piece{status: _empty}) } md := newPieceStatusMetadata(pieces) if err := cads.Download().GetOrSetMetadata(d.Hex(), md); cads.InCacheError(err) { // File is in cache state -- initialize completed pieces. for _, p := range pieces { p.status = _complete } return pieces, numPieces, nil } else if err != nil { return nil, 0, fmt.Errorf("get or set piece metadata: %s", err) } for _, p := range md.pieces { if p.status == _complete { numComplete++ } } return md.pieces, numComplete, nil }