in lib/torrent/storage/agentstorage/torrent.go [202:249]
func (t *Torrent) WritePiece(src storage.PieceReader, pi int) error {
piece, err := t.getPiece(pi)
if err != nil {
return err
}
if int64(src.Length()) != t.PieceLength(pi) {
return fmt.Errorf(
"invalid piece length: expected %d, got %d", t.PieceLength(pi), src.Length())
}
// Exit quickly if the piece is not writable.
if piece.complete() {
return storage.ErrPieceComplete
}
if piece.dirty() {
return errWritePieceConflict
}
dirty, complete := piece.tryMarkDirty()
if dirty {
return errWritePieceConflict
} else if complete {
return storage.ErrPieceComplete
}
// At this point, we've determined that the piece is not complete and ensured
// we are the only thread which may write the piece. We do not block other
// threads from checking if the piece is writable.
if err := t.writePiece(src, pi); err != nil {
// Allow other threads to write this piece since we mysteriously failed.
piece.markEmpty()
return fmt.Errorf("write piece: %s", err)
}
if int(t.numComplete.Load()) == len(t.pieces) {
// Multiple threads may attempt to move the download file to cache, however
// only one will succeed while the others will receive (and ignore) file exist
// error.
err := t.cads.MoveDownloadFileToCache(t.metaInfo.Digest().Hex())
if err != nil && !os.IsExist(err) {
return fmt.Errorf("download completed but failed to move file to cache directory: %s", err)
}
t.committed.Store(true)
}
return nil
}