lib/torrent/scheduler/connstate/state.go (231 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package connstate
import (
"errors"
"time"
"github.com/andres-erbsen/clock"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/scheduler/conn"
"go.uber.org/zap"
)
// State errors.
var (
ErrTorrentAtCapacity = errors.New("torrent is at capacity")
ErrConnAlreadyPending = errors.New("conn is already pending")
ErrConnAlreadyActive = errors.New("conn is already active")
ErrConnClosed = errors.New("conn is closed")
ErrInvalidActiveTransition = errors.New("conn must be pending to transition to active")
ErrTooManyMutualConns = errors.New("conn has too many mutual connections")
// This should NEVER happen.
errUnknownStatus = errors.New("invariant violation: unknown status")
)
type status int
const (
// _uninit indicates the connection is uninitialized. This is the default
// status for empty entries.
_uninit status = iota
_pending
_active
)
type entry struct {
status status
conn *conn.Conn
}
type connKey struct {
hash core.InfoHash
peerID core.PeerID
}
type blacklistEntry struct {
expiration time.Time
}
func (e *blacklistEntry) Blacklisted(now time.Time) bool {
return e.Remaining(now) > 0
}
func (e *blacklistEntry) Remaining(now time.Time) time.Duration {
return e.expiration.Sub(now)
}
// State provides connection lifecycle management and enforces connection
// limits. A connection to a peer is identified by torrent info hash and peer id.
// Each connection may exist in the following states: pending, active, or
// blacklisted. Pending connections are unestablished connections which "reserve"
// connection capacity until they are done handshaking. Active connections are
// established connections. Blacklisted connections are failed connections which
// should be skipped in each peer handout.
//
// Note, State is NOT thread-safe. Synchronization must be provided by the client.
type State struct {
config Config
clk clock.Clock
netevents networkevent.Producer
localPeerID core.PeerID
logger *zap.SugaredLogger
// All pending or active conns. These count towards conn capacity.
conns map[core.InfoHash]map[core.PeerID]entry
// All blacklisted conns. These do not count towards conn capacity.
blacklist map[connKey]*blacklistEntry
}
// New creates a new State.
func New(
config Config,
clk clock.Clock,
localPeerID core.PeerID,
netevents networkevent.Producer,
logger *zap.SugaredLogger) *State {
config = config.applyDefaults()
return &State{
config: config,
clk: clk,
netevents: netevents,
localPeerID: localPeerID,
logger: logger,
conns: make(map[core.InfoHash]map[core.PeerID]entry),
blacklist: make(map[connKey]*blacklistEntry),
}
}
// ActiveConns returns a list of all active connections.
func (s *State) ActiveConns() []*conn.Conn {
var active []*conn.Conn
for _, peers := range s.conns {
for _, e := range peers {
if e.status == _active {
active = append(active, e.conn)
}
}
}
return active
}
// Saturated returns true if h is at capacity and all the conns are active.
func (s *State) Saturated(h core.InfoHash) bool {
peers, ok := s.conns[h]
if !ok {
return false
}
var active int
for _, e := range peers {
if e.status == _active {
active++
}
}
return active == s.config.MaxOpenConnectionsPerTorrent
}
// Blacklist blacklists peerID/h for the configured BlacklistDuration.
// Returns error if the connection is already blacklisted.
func (s *State) Blacklist(peerID core.PeerID, h core.InfoHash) error {
if s.config.DisableBlacklist {
return nil
}
k := connKey{h, peerID}
if e, ok := s.blacklist[k]; ok && e.Blacklisted(s.clk.Now()) {
return errors.New("conn is already blacklisted")
}
s.blacklist[k] = &blacklistEntry{s.clk.Now().Add(s.config.BlacklistDuration)}
s.log("peer", peerID, "hash", h).Infof(
"Connection blacklisted for %s", s.config.BlacklistDuration)
s.netevents.Produce(
networkevent.BlacklistConnEvent(h, s.localPeerID, peerID, s.config.BlacklistDuration))
return nil
}
// Blacklisted returns true if peerID/h is blacklisted.
func (s *State) Blacklisted(peerID core.PeerID, h core.InfoHash) bool {
e, ok := s.blacklist[connKey{h, peerID}]
return ok && e.Blacklisted(s.clk.Now())
}
// ClearBlacklist un-blacklists all connections for h.
func (s *State) ClearBlacklist(h core.InfoHash) {
for k := range s.blacklist {
if k.hash == h {
delete(s.blacklist, k)
}
}
}
// AddPending sets the connection for peerID/h as pending and reserves capacity
// for it.
func (s *State) AddPending(peerID core.PeerID, h core.InfoHash, neighbors []core.PeerID) error {
if len(s.conns[h]) == s.config.MaxOpenConnectionsPerTorrent {
return ErrTorrentAtCapacity
}
switch s.get(h, peerID).status {
case _uninit:
if s.numMutualConns(h, neighbors) > s.config.MaxMutualConnections {
return ErrTooManyMutualConns
}
s.put(h, peerID, entry{status: _pending})
s.log("hash", h, "peer", peerID).Infof(
"Added pending conn, capacity now at %d", s.capacity(h))
return nil
case _pending:
return ErrConnAlreadyPending
case _active:
return ErrConnAlreadyActive
default:
return errUnknownStatus
}
}
// DeletePending deletes the pending connection for peerID/h and frees capacity.
func (s *State) DeletePending(peerID core.PeerID, h core.InfoHash) {
if s.get(h, peerID).status != _pending {
return
}
s.delete(h, peerID)
s.log("hash", h, "peer", peerID).Infof(
"Deleted pending conn, capacity now at %d", s.capacity(h))
}
// MovePendingToActive sets a previously pending connection as active.
func (s *State) MovePendingToActive(c *conn.Conn) error {
if c.IsClosed() {
return ErrConnClosed
}
if s.get(c.InfoHash(), c.PeerID()).status != _pending {
return ErrInvalidActiveTransition
}
s.put(c.InfoHash(), c.PeerID(), entry{status: _active, conn: c})
s.log("hash", c.InfoHash(), "peer", c.PeerID()).Info("Moved conn from pending to active")
s.netevents.Produce(networkevent.AddActiveConnEvent(c.InfoHash(), s.localPeerID, c.PeerID()))
return nil
}
// DeleteActive deletes c. No-ops if c is not an active conn.
func (s *State) DeleteActive(c *conn.Conn) {
e := s.get(c.InfoHash(), c.PeerID())
if e.status != _active {
return
}
if e.conn != c {
// It is possible that some new conn shares the same hash/peer as the old conn,
// so we need to make sure we're deleting the right one.
return
}
s.delete(c.InfoHash(), c.PeerID())
s.log("hash", c.InfoHash(), "peer", c.PeerID()).Infof(
"Deleted active conn, capacity now at %d", s.capacity(c.InfoHash()))
s.netevents.Produce(networkevent.DropActiveConnEvent(
c.InfoHash(), s.localPeerID, c.PeerID()))
}
func (s *State) numMutualConns(h core.InfoHash, neighbors []core.PeerID) int {
var n int
for _, id := range neighbors {
e := s.get(h, id)
if e.status == _pending || e.status == _active {
n++
}
}
return n
}
// BlacklistedConn represents a connection which has been blacklisted.
type BlacklistedConn struct {
PeerID core.PeerID `json:"peer_id"`
InfoHash core.InfoHash `json:"info_hash"`
Remaining time.Duration `json:"remaining"`
}
// BlacklistSnapshot returns a snapshot of all valid blacklist entries.
func (s *State) BlacklistSnapshot() []BlacklistedConn {
var conns []BlacklistedConn
for k, e := range s.blacklist {
c := BlacklistedConn{
PeerID: k.peerID,
InfoHash: k.hash,
Remaining: e.Remaining(s.clk.Now()),
}
conns = append(conns, c)
}
return conns
}
func (s *State) get(h core.InfoHash, peerID core.PeerID) entry {
peers, ok := s.conns[h]
if !ok {
return entry{}
}
return peers[peerID]
}
func (s *State) put(h core.InfoHash, peerID core.PeerID, e entry) {
peers, ok := s.conns[h]
if !ok {
peers = make(map[core.PeerID]entry)
s.conns[h] = peers
}
peers[peerID] = e
}
func (s *State) delete(h core.InfoHash, peerID core.PeerID) {
peers, ok := s.conns[h]
if !ok {
return
}
delete(peers, peerID)
if len(peers) == 0 {
delete(s.conns, h)
}
}
func (s *State) capacity(h core.InfoHash) int {
return s.config.MaxOpenConnectionsPerTorrent - len(s.conns[h])
}
func (s *State) log(args ...interface{}) *zap.SugaredLogger {
return s.logger.With(args...)
}