lib/torrent/scheduler/conn/conn.go (232 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package conn
import (
"errors"
"fmt"
"io"
"net"
"sync"
"time"
"github.com/andres-erbsen/clock"
"github.com/uber-go/tally"
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/uber/kraken/core"
"github.com/uber/kraken/gen/go/proto/p2p"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/storage"
"github.com/uber/kraken/lib/torrent/storage/piecereader"
"github.com/uber/kraken/utils/bandwidth"
"github.com/uber/kraken/utils/memsize"
)
// Maximum support protocol message size. Does not include piece payload.
const maxMessageSize = 32 * memsize.KB
// Events defines Conn events.
type Events interface {
ConnClosed(*Conn)
}
// Conn manages peer communication over a connection for multiple torrents. Inbound
// messages are multiplexed based on the torrent they pertain to.
type Conn struct {
peerID core.PeerID
infoHash core.InfoHash
createdAt time.Time
localPeerID core.PeerID
bandwidth *bandwidth.Limiter
events Events
mu sync.Mutex // Protects the following fields:
lastGoodPieceReceived time.Time
lastPieceSent time.Time
nc net.Conn
config Config
clk clock.Clock
stats tally.Scope
networkEvents networkevent.Producer
// Marks whether the connection was opened by the remote peer, or the local peer.
openedByRemote bool
startOnce sync.Once
sender chan *Message
receiver chan *Message
// The following fields orchestrate the closing of the connection:
closed *atomic.Bool
done chan struct{} // Signals to readLoop / writeLoop to exit.
wg sync.WaitGroup // Waits for readLoop / writeLoop to exit.
logger *zap.SugaredLogger
}
func newConn(
config Config,
stats tally.Scope,
clk clock.Clock,
networkEvents networkevent.Producer,
bandwidth *bandwidth.Limiter,
events Events,
nc net.Conn,
localPeerID core.PeerID,
remotePeerID core.PeerID,
info *storage.TorrentInfo,
openedByRemote bool,
logger *zap.SugaredLogger) (*Conn, error) {
// Clear all deadlines set during handshake. Once a Conn is created, we
// rely on our own idle Conn management via preemption events.
if err := nc.SetDeadline(time.Time{}); err != nil {
return nil, fmt.Errorf("set deadline: %s", err)
}
c := &Conn{
peerID: remotePeerID,
infoHash: info.InfoHash(),
createdAt: clk.Now(),
localPeerID: localPeerID,
bandwidth: bandwidth,
events: events,
nc: nc,
config: config,
clk: clk,
stats: stats,
networkEvents: networkEvents,
openedByRemote: openedByRemote,
sender: make(chan *Message, config.SenderBufferSize),
receiver: make(chan *Message, config.ReceiverBufferSize),
closed: atomic.NewBool(false),
done: make(chan struct{}),
logger: logger,
}
return c, nil
}
// Start starts message processing on c. Note, once c has been started, it may
// close itself if it encounters an error reading/writing to the underlying
// socket.
func (c *Conn) Start() {
c.startOnce.Do(func() {
c.wg.Add(2)
go c.readLoop()
go c.writeLoop()
})
}
// PeerID returns the remote peer id.
func (c *Conn) PeerID() core.PeerID {
return c.peerID
}
// InfoHash returns the info hash for the torrent being transmitted over this
// connection.
func (c *Conn) InfoHash() core.InfoHash {
return c.infoHash
}
// CreatedAt returns the time at which the Conn was created.
func (c *Conn) CreatedAt() time.Time {
return c.createdAt
}
func (c *Conn) String() string {
return fmt.Sprintf("Conn(peer=%s, hash=%s, opened_by_remote=%t)",
c.peerID, c.infoHash, c.openedByRemote)
}
// Send writes the given message to the underlying connection.
func (c *Conn) Send(msg *Message) error {
select {
case <-c.done:
return errors.New("conn closed")
case c.sender <- msg:
return nil
default:
// TODO(codyg): Consider a timeout here instead.
c.stats.Tagged(map[string]string{
"dropped_message_type": msg.Message.Type.String(),
}).Counter("dropped_messages").Inc(1)
return errors.New("send buffer full")
}
}
// Receiver returns a read-only channel for reading incoming messages off the connection.
func (c *Conn) Receiver() <-chan *Message {
return c.receiver
}
// Close starts the shutdown sequence for the Conn.
func (c *Conn) Close() {
if !c.closed.CAS(false, true) {
return
}
go func() {
close(c.done)
c.nc.Close()
c.wg.Wait()
c.events.ConnClosed(c)
}()
}
// IsClosed returns true if the c is closed.
func (c *Conn) IsClosed() bool {
return c.closed.Load()
}
func (c *Conn) readPayload(length int32) ([]byte, error) {
if err := c.bandwidth.ReserveIngress(int64(length)); err != nil {
c.log().Errorf("Error reserving ingress bandwidth for piece payload: %s", err)
return nil, fmt.Errorf("ingress bandwidth: %s", err)
}
payload := make([]byte, length)
if _, err := io.ReadFull(c.nc, payload); err != nil {
return nil, err
}
c.countBandwidth("ingress", int64(8*length))
return payload, nil
}
func (c *Conn) readMessage() (*Message, error) {
p2pMessage, err := readMessage(c.nc)
if err != nil {
return nil, fmt.Errorf("read message: %s", err)
}
var pr storage.PieceReader
if p2pMessage.Type == p2p.Message_PIECE_PAYLOAD {
// For payload messages, we must read the actual payload to the connection
// after reading the message.
payload, err := c.readPayload(p2pMessage.PiecePayload.Length)
if err != nil {
return nil, fmt.Errorf("read payload: %s", err)
}
// TODO(codyg): Consider making this reader read directly from the socket.
pr = piecereader.NewBuffer(payload)
}
return &Message{p2pMessage, pr}, nil
}
// readLoop reads messages off of the underlying connection and sends them to the
// receiver channel.
func (c *Conn) readLoop() {
defer func() {
close(c.receiver)
c.wg.Done()
c.Close()
}()
for {
select {
case <-c.done:
return
default:
msg, err := c.readMessage()
if err != nil {
c.log().Infof("Error reading message from socket, exiting read loop: %s", err)
return
}
c.receiver <- msg
}
}
}
func (c *Conn) sendPiecePayload(pr storage.PieceReader) error {
defer pr.Close()
if err := c.bandwidth.ReserveEgress(int64(pr.Length())); err != nil {
// TODO(codyg): This is bad. Consider alerting here.
c.log().Errorf("Error reserving egress bandwidth for piece payload: %s", err)
return fmt.Errorf("egress bandwidth: %s", err)
}
n, err := io.Copy(c.nc, pr)
if err != nil {
return fmt.Errorf("copy to socket: %s", err)
}
c.countBandwidth("egress", 8*n)
return nil
}
func (c *Conn) sendMessage(msg *Message) error {
if err := sendMessage(c.nc, msg.Message); err != nil {
return fmt.Errorf("send message: %s", err)
}
if msg.Message.Type == p2p.Message_PIECE_PAYLOAD {
// For payload messages, we must write the actual payload to the connection
// after writing the message.
if err := c.sendPiecePayload(msg.Payload); err != nil {
return fmt.Errorf("send piece payload: %s", err)
}
}
return nil
}
// writeLoop writes messages the underlying connection by pulling messages off of the sender
// channel.
func (c *Conn) writeLoop() {
defer func() {
c.wg.Done()
c.Close()
}()
for {
select {
case <-c.done:
return
case msg := <-c.sender:
if err := c.sendMessage(msg); err != nil {
c.log().Infof("Error writing message to socket, exiting write loop: %s", err)
return
}
}
}
}
func (c *Conn) countBandwidth(direction string, n int64) {
c.stats.Tagged(map[string]string{
"piece_bandwidth_direction": direction,
}).Counter("piece_bandwidth").Inc(n)
}
func (c *Conn) log(keysAndValues ...interface{}) *zap.SugaredLogger {
keysAndValues = append(keysAndValues, "remote_peer", c.peerID, "hash", c.infoHash)
return c.logger.With(keysAndValues...)
}