lib/torrent/scheduler/dispatch/dispatcher.go (459 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package dispatch
import (
"errors"
"fmt"
"sync"
"time"
"github.com/uber/kraken/core"
"github.com/uber/kraken/gen/go/proto/p2p"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/scheduler/conn"
"github.com/uber/kraken/lib/torrent/scheduler/dispatch/piecerequest"
"github.com/uber/kraken/lib/torrent/scheduler/torrentlog"
"github.com/uber/kraken/lib/torrent/storage"
"github.com/uber/kraken/utils/syncutil"
"github.com/andres-erbsen/clock"
"github.com/uber-go/tally"
"github.com/willf/bitset"
"go.uber.org/zap"
"golang.org/x/sync/syncmap"
)
var (
errPeerAlreadyDispatched = errors.New("peer is already dispatched for the torrent")
errPieceOutOfBounds = errors.New("piece index out of bounds")
errChunkNotSupported = errors.New("reading / writing chunk of piece not supported")
errRepeatedBitfieldMessage = errors.New("received repeated bitfield message")
)
// Events defines Dispatcher events.
type Events interface {
DispatcherComplete(*Dispatcher)
PeerRemoved(core.PeerID, core.InfoHash)
}
// Messages defines a subset of conn.Conn methods which Dispatcher requires to
// communicate with remote peers.
type Messages interface {
Send(msg *conn.Message) error
Receiver() <-chan *conn.Message
Close()
}
// Dispatcher coordinates torrent state with sending / receiving messages between multiple
// peers. As such, Dispatcher and Torrent have a one-to-one relationship, while Dispatcher
// and Conn have a one-to-many relationship.
type Dispatcher struct {
config Config
stats tally.Scope
clk clock.Clock
createdAt time.Time
localPeerID core.PeerID
torrent *torrentAccessWatcher
peers syncmap.Map // core.PeerID -> *peer
peerStats syncmap.Map // core.PeerID -> *peerStats, persists on peer removal.
numPeersByPiece syncutil.Counters
netevents networkevent.Producer
pieceRequestTimeout time.Duration
pieceRequestManager *piecerequest.Manager
pendingPiecesDoneOnce sync.Once
pendingPiecesDone chan struct{}
completeOnce sync.Once
events Events
logger *zap.SugaredLogger
torrentlog *torrentlog.Logger
}
// New creates a new Dispatcher.
func New(
config Config,
stats tally.Scope,
clk clock.Clock,
netevents networkevent.Producer,
events Events,
peerID core.PeerID,
t storage.Torrent,
logger *zap.SugaredLogger,
tlog *torrentlog.Logger) (*Dispatcher, error) {
d, err := newDispatcher(config, stats, clk, netevents, events, peerID, t, logger, tlog)
if err != nil {
return nil, err
}
// Exits when d.pendingPiecesDone is closed.
go d.watchPendingPieceRequests()
if t.Complete() {
d.complete()
}
return d, nil
}
// newDispatcher creates a new Dispatcher with no side-effects for testing purposes.
func newDispatcher(
config Config,
stats tally.Scope,
clk clock.Clock,
netevents networkevent.Producer,
events Events,
peerID core.PeerID,
t storage.Torrent,
logger *zap.SugaredLogger,
tlog *torrentlog.Logger) (*Dispatcher, error) {
config = config.applyDefaults()
stats = stats.Tagged(map[string]string{
"module": "dispatch",
})
pieceRequestTimeout := config.calcPieceRequestTimeout(t.MaxPieceLength())
pieceRequestManager, err := piecerequest.NewManager(
clk, pieceRequestTimeout, config.PieceRequestPolicy, config.PipelineLimit)
if err != nil {
return nil, fmt.Errorf("piece request manager: %s", err)
}
return &Dispatcher{
config: config,
stats: stats,
clk: clk,
createdAt: clk.Now(),
localPeerID: peerID,
torrent: newTorrentAccessWatcher(t, clk),
numPeersByPiece: syncutil.NewCounters(t.NumPieces()),
netevents: netevents,
pieceRequestTimeout: pieceRequestTimeout,
pieceRequestManager: pieceRequestManager,
pendingPiecesDone: make(chan struct{}),
events: events,
logger: logger,
torrentlog: tlog,
}, nil
}
// Digest returns the blob digest for d's torrent.
func (d *Dispatcher) Digest() core.Digest {
return d.torrent.Digest()
}
// InfoHash returns d's torrent hash.
func (d *Dispatcher) InfoHash() core.InfoHash {
return d.torrent.InfoHash()
}
// Length returns d's torrent length.
func (d *Dispatcher) Length() int64 {
return d.torrent.Length()
}
// Stat returns d's TorrentInfo.
func (d *Dispatcher) Stat() *storage.TorrentInfo {
return d.torrent.Stat()
}
// Complete returns true if d's torrent is complete.
func (d *Dispatcher) Complete() bool {
return d.torrent.Complete()
}
// CreatedAt returns when d was created.
func (d *Dispatcher) CreatedAt() time.Time {
return d.createdAt
}
// LastGoodPieceReceived returns when d last received a valid and needed piece
// from peerID.
func (d *Dispatcher) LastGoodPieceReceived(peerID core.PeerID) time.Time {
v, ok := d.peers.Load(peerID)
if !ok {
return time.Time{}
}
return v.(*peer).getLastGoodPieceReceived()
}
// LastPieceSent returns when d last sent a piece to peerID.
func (d *Dispatcher) LastPieceSent(peerID core.PeerID) time.Time {
v, ok := d.peers.Load(peerID)
if !ok {
return time.Time{}
}
return v.(*peer).getLastPieceSent()
}
// LastReadTime returns when d's torrent was last read from.
func (d *Dispatcher) LastReadTime() time.Time {
return d.torrent.getLastReadTime()
}
// LastWriteTime returns when d's torrent was last written to.
func (d *Dispatcher) LastWriteTime() time.Time {
return d.torrent.getLastWriteTime()
}
// Empty returns true if the Dispatcher has no peers.
func (d *Dispatcher) Empty() bool {
empty := true
d.peers.Range(func(k, v interface{}) bool {
empty = false
return false
})
return empty
}
// RemoteBitfields returns the bitfields of peers connected to the dispatcher.
func (d *Dispatcher) RemoteBitfields() conn.RemoteBitfields {
remoteBitfields := make(conn.RemoteBitfields)
d.peers.Range(func(k, v interface{}) bool {
remoteBitfields[k.(core.PeerID)] = v.(*peer).bitfield.Copy()
return true
})
return remoteBitfields
}
// AddPeer registers a new peer with the Dispatcher.
func (d *Dispatcher) AddPeer(
peerID core.PeerID, b *bitset.BitSet, messages Messages) error {
p, err := d.addPeer(peerID, b, messages)
if err != nil {
return err
}
go d.maybeRequestMorePieces(p)
go d.feed(p)
return nil
}
// addPeer creates and inserts a new peer into the Dispatcher. Split from AddPeer
// with no goroutine side-effects for testing purposes.
func (d *Dispatcher) addPeer(
peerID core.PeerID, b *bitset.BitSet, messages Messages) (*peer, error) {
pstats := &peerStats{}
if s, ok := d.peerStats.LoadOrStore(peerID, pstats); ok {
pstats = s.(*peerStats)
}
p := newPeer(peerID, b, messages, d.clk, pstats)
if _, ok := d.peers.LoadOrStore(peerID, p); ok {
return nil, errors.New("peer already exists")
}
for _, i := range p.bitfield.GetAllSet() {
d.numPeersByPiece.Increment(int(i))
}
return p, nil
}
func (d *Dispatcher) removePeer(p *peer) error {
d.peers.Delete(p.id)
d.pieceRequestManager.ClearPeer(p.id)
for _, i := range p.bitfield.GetAllSet() {
d.numPeersByPiece.Decrement(int(i))
}
return nil
}
// TearDown closes all Dispatcher connections.
func (d *Dispatcher) TearDown() {
d.pendingPiecesDoneOnce.Do(func() {
close(d.pendingPiecesDone)
})
d.peers.Range(func(k, v interface{}) bool {
p := v.(*peer)
d.log("peer", p).Info("Dispatcher teardown closing connection")
p.messages.Close()
return true
})
summaries := make(torrentlog.LeecherSummaries, 0)
d.peerStats.Range(func(k, v interface{}) bool {
peerID := k.(core.PeerID)
pstats := v.(*peerStats)
summaries = append(summaries, torrentlog.LeecherSummary{
PeerID: peerID,
RequestsReceived: pstats.getPieceRequestsReceived(),
PiecesSent: pstats.getPiecesSent(),
})
return true
})
if err := d.torrentlog.LeecherSummaries(
d.torrent.Digest(), d.torrent.InfoHash(), summaries); err != nil {
d.log().Errorf("Error logging incoming piece request summary: %s", err)
}
}
func (d *Dispatcher) String() string {
return fmt.Sprintf("Dispatcher(%s)", d.torrent)
}
func (d *Dispatcher) complete() {
d.completeOnce.Do(func() { go d.events.DispatcherComplete(d) })
d.pendingPiecesDoneOnce.Do(func() { close(d.pendingPiecesDone) })
d.peers.Range(func(k, v interface{}) bool {
p := v.(*peer)
if p.bitfield.Complete() {
// Close connections to other completed peers since those connections
// are now useless.
d.log("peer", p).Info("Closing connection to completed peer")
p.messages.Close()
} else {
// Notify in-progress peers that we have completed the torrent and
// all pieces are available.
p.messages.Send(conn.NewCompleteMessage())
}
return true
})
var piecesRequestedTotal int
summaries := make(torrentlog.SeederSummaries, 0)
d.peerStats.Range(func(k, v interface{}) bool {
peerID := k.(core.PeerID)
pstats := v.(*peerStats)
requested := pstats.getPieceRequestsSent()
piecesRequestedTotal += requested
summary := torrentlog.SeederSummary{
PeerID: peerID,
RequestsSent: requested,
GoodPiecesReceived: pstats.getGoodPiecesReceived(),
DuplicatePiecesReceived: pstats.getDuplicatePiecesReceived(),
}
summaries = append(summaries, summary)
return true
})
// Only log if we actually requested pieces from others.
if piecesRequestedTotal > 0 {
if err := d.torrentlog.SeederSummaries(
d.torrent.Digest(), d.torrent.InfoHash(), summaries); err != nil {
d.log().Errorf("Error logging outgoing piece request summary: %s", err)
}
}
}
func (d *Dispatcher) endgame() bool {
if d.config.DisableEndgame {
return false
}
remaining := d.torrent.NumPieces() - int(d.torrent.Bitfield().Count())
return remaining <= d.config.EndgameThreshold
}
func (d *Dispatcher) maybeRequestMorePieces(p *peer) (bool, error) {
candidates := p.bitfield.Intersection(d.torrent.Bitfield().Complement())
return d.maybeSendPieceRequests(p, candidates)
}
func (d *Dispatcher) maybeSendPieceRequests(p *peer, candidates *bitset.BitSet) (bool, error) {
pieces, err := d.pieceRequestManager.ReservePieces(p.id, candidates, d.numPeersByPiece, d.endgame())
if err != nil {
return false, err
}
if len(pieces) == 0 {
return false, nil
}
for _, i := range pieces {
if err := p.messages.Send(conn.NewPieceRequestMessage(i, d.torrent.PieceLength(i))); err != nil {
// Connection closed.
d.pieceRequestManager.MarkUnsent(p.id, i)
return false, err
}
d.netevents.Produce(
networkevent.RequestPieceEvent(d.torrent.InfoHash(), d.localPeerID, p.id, i))
p.pstats.incrementPieceRequestsSent()
}
return true, nil
}
func (d *Dispatcher) resendFailedPieceRequests() {
failedRequests := d.pieceRequestManager.GetFailedRequests()
if len(failedRequests) > 0 {
d.log().Infof("Resending %d failed piece requests", len(failedRequests))
d.stats.Counter("piece_request_failures").Inc(int64(len(failedRequests)))
}
var sent int
for _, r := range failedRequests {
d.peers.Range(func(k, v interface{}) bool {
p := v.(*peer)
if (r.Status == piecerequest.StatusExpired || r.Status == piecerequest.StatusInvalid) &&
r.PeerID == p.id {
// Do not resend to the same peer for expired or invalid requests.
return true
}
b := d.torrent.Bitfield()
candidates := p.bitfield.Intersection(b.Complement())
if candidates.Test(uint(r.Piece)) {
nb := bitset.New(b.Len()).Set(uint(r.Piece))
if sent, err := d.maybeSendPieceRequests(p, nb); sent && err == nil {
return false
}
}
return true
})
}
unsent := len(failedRequests) - sent
if unsent > 0 {
d.log().Infof("Nowhere to resend %d / %d failed piece requests", unsent, len(failedRequests))
}
}
func (d *Dispatcher) watchPendingPieceRequests() {
for {
select {
case <-d.clk.After(d.pieceRequestTimeout / 2):
d.resendFailedPieceRequests()
case <-d.pendingPiecesDone:
return
}
}
}
// feed reads off of peer and handles incoming messages. When peer's messages close,
// the feed goroutine removes peer from the Dispatcher and exits.
func (d *Dispatcher) feed(p *peer) {
for msg := range p.messages.Receiver() {
if err := d.dispatch(p, msg); err != nil {
d.log().Errorf("Error dispatching message: %s", err)
}
}
d.removePeer(p)
d.events.PeerRemoved(p.id, d.torrent.InfoHash())
}
func (d *Dispatcher) dispatch(p *peer, msg *conn.Message) error {
switch msg.Message.Type {
case p2p.Message_ERROR:
d.handleError(p, msg.Message.Error)
case p2p.Message_ANNOUCE_PIECE:
d.handleAnnouncePiece(p, msg.Message.AnnouncePiece)
case p2p.Message_PIECE_REQUEST:
d.handlePieceRequest(p, msg.Message.PieceRequest)
case p2p.Message_PIECE_PAYLOAD:
d.handlePiecePayload(p, msg.Message.PiecePayload, msg.Payload)
case p2p.Message_CANCEL_PIECE:
d.handleCancelPiece(p, msg.Message.CancelPiece)
case p2p.Message_BITFIELD:
d.handleBitfield(p, msg.Message.Bitfield)
case p2p.Message_COMPLETE:
d.handleComplete(p)
default:
return fmt.Errorf("unknown message type: %d", msg.Message.Type)
}
return nil
}
func (d *Dispatcher) handleError(p *peer, msg *p2p.ErrorMessage) {
switch msg.Code {
case p2p.ErrorMessage_PIECE_REQUEST_FAILED:
d.log().Errorf("Piece request failed: %s", msg.Error)
d.pieceRequestManager.MarkInvalid(p.id, int(msg.Index))
}
}
func (d *Dispatcher) handleAnnouncePiece(p *peer, msg *p2p.AnnouncePieceMessage) {
if int(msg.Index) >= d.torrent.NumPieces() {
d.log().Errorf("Announce piece out of bounds: %d >= %d", msg.Index, d.torrent.NumPieces())
return
}
i := int(msg.Index)
p.bitfield.Set(uint(i), true)
d.numPeersByPiece.Increment(int(i))
d.maybeRequestMorePieces(p)
}
func (d *Dispatcher) isFullPiece(i, offset, length int) bool {
return offset == 0 && length == int(d.torrent.PieceLength(i))
}
func (d *Dispatcher) handlePieceRequest(p *peer, msg *p2p.PieceRequestMessage) {
p.pstats.incrementPieceRequestsReceived()
i := int(msg.Index)
if !d.isFullPiece(i, int(msg.Offset), int(msg.Length)) {
d.log("peer", p, "piece", i).Error("Rejecting piece request: chunk not supported")
p.messages.Send(conn.NewErrorMessage(i, p2p.ErrorMessage_PIECE_REQUEST_FAILED, errChunkNotSupported))
return
}
payload, err := d.torrent.GetPieceReader(i)
if err != nil {
d.log("peer", p, "piece", i).Errorf("Error getting reader for requested piece: %s", err)
p.messages.Send(conn.NewErrorMessage(i, p2p.ErrorMessage_PIECE_REQUEST_FAILED, err))
return
}
if err := p.messages.Send(conn.NewPiecePayloadMessage(i, payload)); err != nil {
return
}
p.touchLastPieceSent()
p.pstats.incrementPiecesSent()
// Assume that the peer successfully received the piece.
p.bitfield.Set(uint(i), true)
}
func (d *Dispatcher) handlePiecePayload(
p *peer, msg *p2p.PiecePayloadMessage, payload storage.PieceReader) {
defer payload.Close()
i := int(msg.Index)
if !d.isFullPiece(i, int(msg.Offset), int(msg.Length)) {
d.log("peer", p, "piece", i).Error("Rejecting piece payload: chunk not supported")
d.pieceRequestManager.MarkInvalid(p.id, i)
return
}
if err := d.torrent.WritePiece(payload, i); err != nil {
if err != storage.ErrPieceComplete {
d.log("peer", p, "piece", i).Errorf("Error writing piece payload: %s", err)
d.pieceRequestManager.MarkInvalid(p.id, i)
} else {
p.pstats.incrementDuplicatePiecesReceived()
}
return
}
d.netevents.Produce(
networkevent.ReceivePieceEvent(d.torrent.InfoHash(), d.localPeerID, p.id, i))
p.pstats.incrementGoodPiecesReceived()
p.touchLastGoodPieceReceived()
if d.torrent.Complete() {
d.complete()
}
d.pieceRequestManager.Clear(i)
d.maybeRequestMorePieces(p)
d.peers.Range(func(k, v interface{}) bool {
if k.(core.PeerID) == p.id {
return true
}
pp := v.(*peer)
pp.messages.Send(conn.NewAnnouncePieceMessage(i))
return true
})
}
func (d *Dispatcher) handleCancelPiece(p *peer, msg *p2p.CancelPieceMessage) {
// No-op: cancelling not supported because all received messages are synchronized,
// therefore if we receive a cancel it is already too late -- we've already read
// the piece.
}
func (d *Dispatcher) handleBitfield(p *peer, msg *p2p.BitfieldMessage) {
d.log("peer", p).Error("Unexpected bitfield message from established conn")
}
func (d *Dispatcher) handleComplete(p *peer) {
if d.Complete() {
d.log("peer", p).Info("Closing connection to completed peer")
p.messages.Close()
} else {
p.bitfield.SetAll(true)
d.maybeRequestMorePieces(p)
}
}
func (d *Dispatcher) log(args ...interface{}) *zap.SugaredLogger {
args = append(args, "torrent", d.torrent)
return d.logger.With(args...)
}