func()

in lib/torrent/scheduler/dispatch/dispatcher.go [577:633]


func (d *Dispatcher) handlePiecePayload(
	p *peer, msg *p2p.PiecePayloadMessage, payload storage.PieceReader) {

	defer closers.Close(payload)

	i := int(msg.Index)
	if !d.isFullPiece(i, int(msg.Offset), int(msg.Length)) {
		d.log("peer", p, "piece", i).Error("Rejecting piece payload: chunk not supported")
		d.pieceRequestManager.MarkInvalid(p.id, i)
		return
	}

	if err := d.torrent.WritePiece(payload, i); err != nil {
		if err != storage.ErrPieceComplete {
			d.log("peer", p, "piece", i).Errorf("Error writing piece payload: %s", err)
			d.pieceRequestManager.MarkInvalid(p.id, i)
		} else {
			p.pstats.incrementDuplicatePiecesReceived()
		}
		return
	}

	d.netevents.Produce(
		networkevent.ReceivePieceEvent(d.torrent.InfoHash(), d.localPeerID, p.id, i))

	p.pstats.incrementGoodPiecesReceived()
	p.touchLastGoodPieceReceived()
	if d.torrent.Complete() {
		d.complete()
	}

	d.pieceRequestManager.Clear(i)

	if _, err := d.maybeRequestMorePieces(p); err != nil {
		d.log("peer", p).Errorf("Error requesting more pieces: %s", err)
	}

	d.peers.Range(func(k, v interface{}) bool {
		peerID, ok := k.(core.PeerID)
		if !ok {
			panic(fmt.Sprintf("dispatcher: stored key is not core.PeerID: %T", k))
		}
		if peerID == p.id {
			return true
		}
		pp, ok := v.(*peer)
		if !ok {
			panic(fmt.Sprintf("dispatcher: stored value is not *peer: %T", v))
		}

		if err := pp.messages.Send(conn.NewAnnouncePieceMessage(i)); err != nil {
			d.log("peer", pp).Errorf("Error sending announce piece message: %s", err)
		}

		return true
	})
}