lib/torrent/scheduler/events.go (364 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"time"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/scheduler/conn"
"github.com/uber/kraken/lib/torrent/scheduler/connstate"
"github.com/uber/kraken/lib/torrent/scheduler/dispatch"
"github.com/uber/kraken/lib/torrent/storage"
"github.com/uber/kraken/utils/memsize"
"github.com/uber/kraken/utils/timeutil"
"github.com/willf/bitset"
)
// event describes an external event which modifies state. While the event is
// applying, it is guaranteed to be the only accessor of state.
type event interface {
apply(*state)
}
// eventLoop represents a serialized list of events to be applied to scheduler
// state.
type eventLoop interface {
send(event) bool
sendTimeout(e event, timeout time.Duration) error
run(*state)
stop()
}
type baseEventLoop struct {
events chan event
done chan struct{}
}
func newEventLoop() *baseEventLoop {
return &baseEventLoop{
events: make(chan event),
done: make(chan struct{}),
}
}
// send sends a new event into l. Should never be called by the same goroutine
// running l (i.e. within apply methods), else deadlock will occur. Returns false
// if the l is not running.
func (l *baseEventLoop) send(e event) bool {
select {
case l.events <- e:
return true
case <-l.done:
return false
}
}
func (l *baseEventLoop) sendTimeout(e event, timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case l.events <- e:
return nil
case <-l.done:
return ErrSchedulerStopped
case <-timer.C:
return ErrSendEventTimedOut
}
}
func (l *baseEventLoop) run(s *state) {
for {
select {
case e := <-l.events:
e.apply(s)
case <-l.done:
return
}
}
}
func (l *baseEventLoop) stop() {
close(l.done)
}
type liftedEventLoop struct {
eventLoop
}
// liftEventLoop lifts events from subpackages into an eventLoop.
func liftEventLoop(l eventLoop) *liftedEventLoop {
return &liftedEventLoop{l}
}
func (l *liftedEventLoop) ConnClosed(c *conn.Conn) {
l.send(connClosedEvent{c})
}
func (l *liftedEventLoop) DispatcherComplete(d *dispatch.Dispatcher) {
l.send(dispatcherCompleteEvent{d})
}
func (l *liftedEventLoop) PeerRemoved(peerID core.PeerID, h core.InfoHash) {
l.send(peerRemovedEvent{peerID, h})
}
func (l *liftedEventLoop) AnnounceTick() {
l.send(announceTickEvent{})
}
// connClosedEvent occurs when a connection is closed.
type connClosedEvent struct {
c *conn.Conn
}
// apply ejects the conn from the scheduler's active connections.
func (e connClosedEvent) apply(s *state) {
s.conns.DeleteActive(e.c)
if err := s.conns.Blacklist(e.c.PeerID(), e.c.InfoHash()); err != nil {
s.log("conn", e.c).Infof("Cannot blacklist active conn: %s", err)
}
}
// incomingHandshakeEvent when a handshake was received from a new connection.
type incomingHandshakeEvent struct {
pc *conn.PendingConn
}
// apply rejects incoming handshakes when the scheduler is at capacity. If the
// scheduler has capacity for more connections, adds the peer/hash of the handshake
// to the scheduler's pending connections and asynchronously attempts to establish
// the connection.
func (e incomingHandshakeEvent) apply(s *state) {
peerNeighbors := make([]core.PeerID, len(e.pc.RemoteBitfields()))
var i int
for peerID := range e.pc.RemoteBitfields() {
peerNeighbors[i] = peerID
i++
}
if err := s.conns.AddPending(e.pc.PeerID(), e.pc.InfoHash(), peerNeighbors); err != nil {
s.log("peer", e.pc.PeerID(), "hash", e.pc.InfoHash()).Infof(
"Rejecting incoming handshake: %s", err)
s.sched.torrentlog.IncomingConnectionReject(e.pc.Digest(), e.pc.InfoHash(), e.pc.PeerID(), err)
e.pc.Close()
return
}
var rb conn.RemoteBitfields
if ctrl, ok := s.torrentControls[e.pc.InfoHash()]; ok {
rb = ctrl.dispatcher.RemoteBitfields()
}
go s.sched.establishIncomingHandshake(e.pc, rb)
}
// failedIncomingHandshakeEvent occurs when a pending incoming connection fails
// to handshake.
type failedIncomingHandshakeEvent struct {
peerID core.PeerID
infoHash core.InfoHash
}
func (e failedIncomingHandshakeEvent) apply(s *state) {
s.conns.DeletePending(e.peerID, e.infoHash)
}
// incomingConnEvent occurs when a pending incoming connection finishes handshaking.
type incomingConnEvent struct {
namespace string
c *conn.Conn
bitfield *bitset.BitSet
info *storage.TorrentInfo
}
// apply transitions a fully-handshaked incoming conn from pending to active.
func (e incomingConnEvent) apply(s *state) {
if err := s.addIncomingConn(e.namespace, e.c, e.bitfield, e.info); err != nil {
s.log("conn", e.c).Errorf("Error adding incoming conn: %s", err)
e.c.Close()
return
}
s.log("conn", e.c).Info("Added incoming conn")
}
// failedOutgoingHandshakeEvent occurs when a pending incoming connection fails
// to handshake.
type failedOutgoingHandshakeEvent struct {
peerID core.PeerID
infoHash core.InfoHash
}
func (e failedOutgoingHandshakeEvent) apply(s *state) {
s.conns.DeletePending(e.peerID, e.infoHash)
if err := s.conns.Blacklist(e.peerID, e.infoHash); err != nil {
s.log("peer", e.peerID, "hash", e.infoHash).Infof("Cannot blacklist pending conn: %s", err)
}
}
// outgoingConnEvent occurs when a pending outgoing connection finishes handshaking.
type outgoingConnEvent struct {
c *conn.Conn
bitfield *bitset.BitSet
info *storage.TorrentInfo
}
// apply transitions a fully-handshaked outgoing conn from pending to active.
func (e outgoingConnEvent) apply(s *state) {
if err := s.addOutgoingConn(e.c, e.bitfield, e.info); err != nil {
s.log("conn", e.c).Errorf("Error adding outgoing conn: %s", err)
e.c.Close()
return
}
s.log("conn", e.c).Infof("Added outgoing conn with %d%% downloaded", e.info.PercentDownloaded())
}
// announceTickEvent occurs when it is time to announce to the tracker.
type announceTickEvent struct{}
// apply pulls the next dispatcher from the announce queue and asynchronously
// makes an announce request to the tracker.
func (e announceTickEvent) apply(s *state) {
var skipped []core.InfoHash
for {
h, ok := s.announceQueue.Next()
if !ok {
s.log().Debug("No torrents in announce queue")
break
}
if s.conns.Saturated(h) {
s.log("hash", h).Debug("Skipping announce for fully saturated torrent")
skipped = append(skipped, h)
continue
}
ctrl, ok := s.torrentControls[h]
if !ok {
s.log("hash", h).Error("Pulled unknown torrent off announce queue")
continue
}
go s.sched.announce(
ctrl.dispatcher.Digest(), ctrl.dispatcher.InfoHash(), ctrl.dispatcher.Complete())
break
}
// Re-enqueue any torrents we pulled off and ignored, else we would never
// announce them again.
for _, h := range skipped {
s.announceQueue.Ready(h)
}
}
// announceResultEvent occurs when a successfully announced response was received
// from the tracker.
type announceResultEvent struct {
infoHash core.InfoHash
peers []*core.PeerInfo
}
// apply selects new peers returned via an announce response to open connections to
// if there is capacity. These connections are added to the scheduler's pending
// connections and handshaked asynchronously.
//
// Also marks the dispatcher as ready to announce again.
func (e announceResultEvent) apply(s *state) {
ctrl, ok := s.torrentControls[e.infoHash]
if !ok {
s.log("hash", e.infoHash).Info("Dispatcher closed after announce response received")
return
}
s.announceQueue.Ready(e.infoHash)
if ctrl.dispatcher.Complete() {
// Torrent is already complete, don't open any new connections.
return
}
for _, p := range e.peers {
if p.PeerID == s.sched.pctx.PeerID {
// Tracker may return our own peer.
continue
}
if s.conns.Blacklisted(p.PeerID, e.infoHash) {
continue
}
if err := s.conns.AddPending(p.PeerID, e.infoHash, nil); err != nil {
if err == connstate.ErrTorrentAtCapacity {
break
}
continue
}
go s.sched.initializeOutgoingHandshake(
p, ctrl.dispatcher.Stat(), ctrl.dispatcher.RemoteBitfields(), ctrl.namespace)
}
}
// announceErrEvent occurs when an announce request fails.
type announceErrEvent struct {
infoHash core.InfoHash
err error
}
// apply marks the dispatcher as ready to announce again.
func (e announceErrEvent) apply(s *state) {
s.log("hash", e.infoHash).Errorf("Error announcing: %s", e.err)
s.announceQueue.Ready(e.infoHash)
}
// newTorrentEvent occurs when a new torrent was requested for download.
type newTorrentEvent struct {
namespace string
torrent storage.Torrent
errc chan error
}
// apply begins seeding / leeching a new torrent.
func (e newTorrentEvent) apply(s *state) {
ctrl, ok := s.torrentControls[e.torrent.InfoHash()]
if !ok {
var err error
ctrl, err = s.addTorrent(e.namespace, e.torrent, true)
if err != nil {
e.errc <- err
return
}
s.log("torrent", e.torrent).Info("Added new torrent")
}
if ctrl.dispatcher.Complete() {
e.errc <- nil
return
}
ctrl.errors = append(ctrl.errors, e.errc)
// Immediately announce new torrents.
go s.sched.announce(ctrl.dispatcher.Digest(), ctrl.dispatcher.InfoHash(), ctrl.dispatcher.Complete())
}
// dispatcherCompleteEvent occurs when a dispatcher finishes downloading its torrent.
type dispatcherCompleteEvent struct {
dispatcher *dispatch.Dispatcher
}
// apply marks the dispatcher for its final announce.
func (e dispatcherCompleteEvent) apply(s *state) {
infoHash := e.dispatcher.InfoHash()
s.conns.ClearBlacklist(infoHash)
s.announceQueue.Eject(infoHash)
ctrl, ok := s.torrentControls[infoHash]
if !ok {
s.log("dispatcher", e.dispatcher).Error("Completed dispatcher not found")
return
}
for _, errc := range ctrl.errors {
errc <- nil
}
if ctrl.localRequest {
// Normalize the download time for all torrent sizes to a per MB value.
// Skip torrents that are less than a MB in size because we can't measure
// at that granularity.
downloadTime := s.sched.clock.Now().Sub(ctrl.dispatcher.CreatedAt())
lengthMB := ctrl.dispatcher.Length() / int64(memsize.MB)
if lengthMB > 0 {
s.sched.stats.Timer("download_time_per_mb").Record(downloadTime / time.Duration(lengthMB))
}
}
s.log("hash", infoHash).Info("Torrent complete")
s.sched.netevents.Produce(networkevent.TorrentCompleteEvent(infoHash, s.sched.pctx.PeerID))
// Immediately announce completed torrents.
go s.sched.announce(ctrl.dispatcher.Digest(), ctrl.dispatcher.InfoHash(), true)
}
// peerRemovedEvent occurs when a dispatcher removes a peer with a closed
// connection. Currently is a no-op.
type peerRemovedEvent struct {
peerID core.PeerID
infoHash core.InfoHash
}
func (e peerRemovedEvent) apply(s *state) {}
// preemptionTickEvent occurs periodically to preempt unneeded conns and remove
// idle torrentControls.
type preemptionTickEvent struct{}
func (e preemptionTickEvent) apply(s *state) {
for _, c := range s.conns.ActiveConns() {
ctrl, ok := s.torrentControls[c.InfoHash()]
if !ok {
s.log("conn", c).Error(
"Invariant violation: active conn not assigned to dispatcher")
c.Close()
continue
}
lastProgress := timeutil.MostRecent(
c.CreatedAt(),
ctrl.dispatcher.LastGoodPieceReceived(c.PeerID()),
ctrl.dispatcher.LastPieceSent(c.PeerID()))
if s.sched.clock.Now().Sub(lastProgress) > s.sched.config.ConnTTI {
s.log("conn", c).Info("Closing idle conn")
c.Close()
continue
}
if s.sched.clock.Now().Sub(c.CreatedAt()) > s.sched.config.ConnTTL {
s.log("conn", c).Info("Closing expired conn")
c.Close()
continue
}
}
for h, ctrl := range s.torrentControls {
idleSeeder :=
ctrl.dispatcher.Complete() &&
s.sched.clock.Now().Sub(ctrl.dispatcher.LastReadTime()) >= s.sched.config.SeederTTI
if idleSeeder {
s.sched.torrentlog.SeedTimeout(ctrl.dispatcher.Digest(), h)
}
idleLeecher :=
!ctrl.dispatcher.Complete() &&
s.sched.clock.Now().Sub(ctrl.dispatcher.LastWriteTime()) >= s.sched.config.LeecherTTI
if idleLeecher {
s.sched.torrentlog.LeechTimeout(ctrl.dispatcher.Digest(), h)
}
if idleSeeder || idleLeecher {
s.log("hash", h, "inprogress", !ctrl.dispatcher.Complete()).Info("Removing idle torrent")
s.removeTorrent(h, ErrTorrentTimeout)
}
}
}
// emitStatsEvent occurs periodically to emit scheduler stats.
type emitStatsEvent struct{}
func (e emitStatsEvent) apply(s *state) {
s.sched.stats.Gauge("torrents").Update(float64(len(s.torrentControls)))
}
type blacklistSnapshotEvent struct {
result chan []connstate.BlacklistedConn
}
func (e blacklistSnapshotEvent) apply(s *state) {
e.result <- s.conns.BlacklistSnapshot()
}
// removeTorrentEvent occurs when a torrent is manually removed via scheduler API.
type removeTorrentEvent struct {
digest core.Digest
errc chan error
}
func (e removeTorrentEvent) apply(s *state) {
for h, ctrl := range s.torrentControls {
if ctrl.dispatcher.Digest() == e.digest {
s.log(
"hash", h,
"inprogress", !ctrl.dispatcher.Complete()).Info("Removing torrent")
s.removeTorrent(h, ErrTorrentRemoved)
}
}
e.errc <- s.sched.torrentArchive.DeleteTorrent(e.digest)
}
// probeEvent occurs when a probe is manually requested via scheduler API.
// The event loop is unbuffered, so if a probe can be successfully sent, then
// the event loop is healthy.
type probeEvent struct{}
func (e probeEvent) apply(*state) {}
// shutdownEvent stops the event loop and tears down all active torrents and
// connections.
type shutdownEvent struct{}
func (e shutdownEvent) apply(s *state) {
for _, c := range s.conns.ActiveConns() {
s.log("conn", c).Info("Closing conn to stop scheduler")
c.Close()
}
// Notify local clients of pending torrents that they will not complete.
for _, ctrl := range s.torrentControls {
ctrl.dispatcher.TearDown()
for _, errc := range ctrl.errors {
errc <- ErrSchedulerStopped
}
}
s.sched.eventLoop.stop()
}