lib/torrent/scheduler/conn/handshaker.go (280 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package conn import ( "errors" "fmt" "net" "github.com/uber/kraken/core" "github.com/uber/kraken/gen/go/proto/p2p" "github.com/uber/kraken/lib/torrent/networkevent" "github.com/uber/kraken/lib/torrent/storage" "github.com/uber/kraken/utils/bandwidth" "github.com/andres-erbsen/clock" "github.com/uber-go/tally" "github.com/willf/bitset" "go.uber.org/zap" ) // RemoteBitfields represents the bitfields of an agent's peers for a given torrent. type RemoteBitfields map[core.PeerID]*bitset.BitSet func (rb RemoteBitfields) marshalBinary() (map[string][]byte, error) { rbBytes := make(map[string][]byte) for peerID, bitfield := range rb { b, err := bitfield.MarshalBinary() if err != nil { return nil, err } rbBytes[peerID.String()] = b } return rbBytes, nil } func (rb RemoteBitfields) unmarshalBinary(rbBytes map[string][]byte) error { for peerIDStr, bitfieldBytes := range rbBytes { peerID, err := core.NewPeerID(peerIDStr) if err != nil { return fmt.Errorf("peer id: %s", err) } bitfield := bitset.New(0) if err := bitfield.UnmarshalBinary(bitfieldBytes); err != nil { return err } rb[peerID] = bitfield } return nil } // handshake contains the same fields as a protobuf bitfield message, but with // the fields converted into types used within the scheduler package. As such, // in this package "handshake" and "bitfield message" are usually synonymous. type handshake struct { peerID core.PeerID digest core.Digest infoHash core.InfoHash bitfield *bitset.BitSet remoteBitfields RemoteBitfields namespace string } func (h *handshake) toP2PMessage() (*p2p.Message, error) { b, err := h.bitfield.MarshalBinary() if err != nil { return nil, err } rb, err := h.remoteBitfields.marshalBinary() if err != nil { return nil, err } return &p2p.Message{ Type: p2p.Message_BITFIELD, Bitfield: &p2p.BitfieldMessage{ PeerID: h.peerID.String(), Name: h.digest.Hex(), InfoHash: h.infoHash.String(), BitfieldBytes: b, RemoteBitfieldBytes: rb, Namespace: h.namespace, }, }, nil } func handshakeFromP2PMessage(m *p2p.Message) (*handshake, error) { if m.Type != p2p.Message_BITFIELD { return nil, fmt.Errorf("expected bitfield message, got %s", m.Type) } bitfieldMsg := m.GetBitfield() if bitfieldMsg == nil { return nil, fmt.Errorf("empty bit field") } peerID, err := core.NewPeerID(bitfieldMsg.PeerID) if err != nil { return nil, fmt.Errorf("peer id: %s", err) } ih, err := core.NewInfoHashFromHex(bitfieldMsg.InfoHash) if err != nil { return nil, fmt.Errorf("info hash: %s", err) } d, err := core.NewSHA256DigestFromHex(bitfieldMsg.Name) if err != nil { return nil, fmt.Errorf("name: %s", err) } bitfield := bitset.New(0) if err := bitfield.UnmarshalBinary(bitfieldMsg.BitfieldBytes); err != nil { return nil, err } remoteBitfields := make(RemoteBitfields) if err := remoteBitfields.unmarshalBinary(bitfieldMsg.RemoteBitfieldBytes); err != nil { return nil, err } return &handshake{ peerID: peerID, infoHash: ih, bitfield: bitfield, digest: d, namespace: bitfieldMsg.Namespace, remoteBitfields: remoteBitfields, }, nil } // PendingConn represents half-opened, pending connection initialized by a // remote peer. type PendingConn struct { handshake *handshake nc net.Conn } // PeerID returns the remote peer id. func (pc *PendingConn) PeerID() core.PeerID { return pc.handshake.peerID } // Digest returns the digest of the blob the remote peer wants to open. func (pc *PendingConn) Digest() core.Digest { return pc.handshake.digest } // InfoHash returns the info hash of the torrent the remote peer wants to open. func (pc *PendingConn) InfoHash() core.InfoHash { return pc.handshake.infoHash } // Bitfield returns the bitfield of the remote peer's torrent. func (pc *PendingConn) Bitfield() *bitset.BitSet { return pc.handshake.bitfield } // RemoteBitfields returns the bitfield of the remote peer's torrent. func (pc *PendingConn) RemoteBitfields() RemoteBitfields { return pc.handshake.remoteBitfields } // Namespace returns the namespace of the remote peer's torrent. func (pc *PendingConn) Namespace() string { return pc.handshake.namespace } // Close closes the connection. func (pc *PendingConn) Close() { pc.nc.Close() } // HandshakeResult wraps data returned from a successful handshake. type HandshakeResult struct { Conn *Conn Bitfield *bitset.BitSet RemoteBitfields RemoteBitfields } // Handshaker defines the handshake protocol for establishing connections to // other peers. type Handshaker struct { config Config stats tally.Scope clk clock.Clock bandwidth *bandwidth.Limiter networkEvents networkevent.Producer peerID core.PeerID events Events } // NewHandshaker creates a new Handshaker. func NewHandshaker( config Config, stats tally.Scope, clk clock.Clock, networkEvents networkevent.Producer, peerID core.PeerID, events Events, logger *zap.SugaredLogger) (*Handshaker, error) { config = config.applyDefaults() stats = stats.Tagged(map[string]string{ "module": "conn", }) bl, err := bandwidth.NewLimiter(config.Bandwidth, bandwidth.WithLogger(logger)) if err != nil { return nil, fmt.Errorf("bandwidth: %s", err) } return &Handshaker{ config: config, stats: stats, clk: clk, bandwidth: bl, networkEvents: networkEvents, peerID: peerID, events: events, }, nil } // Accept upgrades a raw network connection opened by a remote peer into a // PendingConn. func (h *Handshaker) Accept(nc net.Conn) (*PendingConn, error) { hs, err := h.readHandshake(nc) if err != nil { return nil, fmt.Errorf("read handshake: %s", err) } return &PendingConn{hs, nc}, nil } // Establish upgrades a PendingConn returned via Accept into a fully // established Conn. func (h *Handshaker) Establish( pc *PendingConn, info *storage.TorrentInfo, remoteBitfields RemoteBitfields) (*Conn, error) { // Namespace is one-directional: it is only supplied by the connection opener // and is not reciprocated by the connection acceptor. if err := h.sendHandshake(pc.nc, info, remoteBitfields, ""); err != nil { return nil, fmt.Errorf("send handshake: %s", err) } c, err := h.newConn(pc.nc, pc.handshake.peerID, info, true) if err != nil { return nil, fmt.Errorf("new conn: %s", err) } return c, nil } // Initialize returns a fully established Conn for the given torrent to the // given peer / address. Also returns the bitfield of the remote peer and // its connections for the torrent. func (h *Handshaker) Initialize( peerID core.PeerID, addr string, info *storage.TorrentInfo, remoteBitfields RemoteBitfields, namespace string) (*HandshakeResult, error) { nc, err := net.DialTimeout("tcp", addr, h.config.HandshakeTimeout) if err != nil { return nil, fmt.Errorf("dial: %s", err) } r, err := h.fullHandshake(nc, peerID, info, remoteBitfields, namespace) if err != nil { nc.Close() return nil, err } return r, nil } func (h *Handshaker) sendHandshake( nc net.Conn, info *storage.TorrentInfo, remoteBitfields RemoteBitfields, namespace string) error { hs := &handshake{ peerID: h.peerID, digest: info.Digest(), infoHash: info.InfoHash(), bitfield: info.Bitfield(), remoteBitfields: remoteBitfields, namespace: namespace, } msg, err := hs.toP2PMessage() if err != nil { return err } return sendMessageWithTimeout(nc, msg, h.config.HandshakeTimeout) } func (h *Handshaker) readHandshake(nc net.Conn) (*handshake, error) { m, err := readMessageWithTimeout(nc, h.config.HandshakeTimeout) if err != nil { return nil, fmt.Errorf("read message: %s", err) } hs, err := handshakeFromP2PMessage(m) if err != nil { return nil, fmt.Errorf("handshake from p2p message: %s", err) } return hs, nil } func (h *Handshaker) fullHandshake( nc net.Conn, peerID core.PeerID, info *storage.TorrentInfo, remoteBitfields RemoteBitfields, namespace string) (*HandshakeResult, error) { if err := h.sendHandshake(nc, info, remoteBitfields, namespace); err != nil { return nil, fmt.Errorf("send handshake: %s", err) } hs, err := h.readHandshake(nc) if err != nil { return nil, fmt.Errorf("read handshake: %s", err) } if hs.peerID != peerID { return nil, errors.New("unexpected peer id") } c, err := h.newConn(nc, peerID, info, false) if err != nil { return nil, fmt.Errorf("new conn: %s", err) } return &HandshakeResult{c, hs.bitfield, hs.remoteBitfields}, nil } func (h *Handshaker) newConn( nc net.Conn, peerID core.PeerID, info *storage.TorrentInfo, openedByRemote bool) (*Conn, error) { return newConn( h.config, h.stats, h.clk, h.networkEvents, h.bandwidth, h.events, nc, h.peerID, peerID, info, openedByRemote, zap.NewNop().Sugar()) }