in lib/torrent/scheduler/dispatch/dispatcher.go [577:633]
func (d *Dispatcher) handlePiecePayload(
p *peer, msg *p2p.PiecePayloadMessage, payload storage.PieceReader) {
defer closers.Close(payload)
i := int(msg.Index)
if !d.isFullPiece(i, int(msg.Offset), int(msg.Length)) {
d.log("peer", p, "piece", i).Error("Rejecting piece payload: chunk not supported")
d.pieceRequestManager.MarkInvalid(p.id, i)
return
}
if err := d.torrent.WritePiece(payload, i); err != nil {
if err != storage.ErrPieceComplete {
d.log("peer", p, "piece", i).Errorf("Error writing piece payload: %s", err)
d.pieceRequestManager.MarkInvalid(p.id, i)
} else {
p.pstats.incrementDuplicatePiecesReceived()
}
return
}
d.netevents.Produce(
networkevent.ReceivePieceEvent(d.torrent.InfoHash(), d.localPeerID, p.id, i))
p.pstats.incrementGoodPiecesReceived()
p.touchLastGoodPieceReceived()
if d.torrent.Complete() {
d.complete()
}
d.pieceRequestManager.Clear(i)
if _, err := d.maybeRequestMorePieces(p); err != nil {
d.log("peer", p).Errorf("Error requesting more pieces: %s", err)
}
d.peers.Range(func(k, v interface{}) bool {
peerID, ok := k.(core.PeerID)
if !ok {
panic(fmt.Sprintf("dispatcher: stored key is not core.PeerID: %T", k))
}
if peerID == p.id {
return true
}
pp, ok := v.(*peer)
if !ok {
panic(fmt.Sprintf("dispatcher: stored value is not *peer: %T", v))
}
if err := pp.messages.Send(conn.NewAnnouncePieceMessage(i)); err != nil {
d.log("peer", pp).Errorf("Error sending announce piece message: %s", err)
}
return true
})
}