lib/torrent/scheduler/scheduler.go (307 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package scheduler import ( "errors" "fmt" "net" "sync" "time" "github.com/andres-erbsen/clock" "github.com/uber-go/tally" "go.uber.org/zap" "github.com/uber/kraken/core" "github.com/uber/kraken/lib/torrent/networkevent" "github.com/uber/kraken/lib/torrent/scheduler/announcequeue" "github.com/uber/kraken/lib/torrent/scheduler/announcer" "github.com/uber/kraken/lib/torrent/scheduler/conn" "github.com/uber/kraken/lib/torrent/scheduler/connstate" "github.com/uber/kraken/lib/torrent/scheduler/torrentlog" "github.com/uber/kraken/lib/torrent/storage" "github.com/uber/kraken/tracker/announceclient" "github.com/uber/kraken/utils/log" ) // Scheduler errors. var ( ErrTorrentNotFound = errors.New("torrent not found") ErrSchedulerStopped = errors.New("scheduler has been stopped") ErrTorrentTimeout = errors.New("torrent timed out") ErrTorrentRemoved = errors.New("torrent manually removed") ErrSendEventTimedOut = errors.New("event loop send timed out") ) // Scheduler defines operations for scheduler. type Scheduler interface { Stop() Download(namespace string, d core.Digest) error BlacklistSnapshot() ([]connstate.BlacklistedConn, error) RemoveTorrent(d core.Digest) error Probe() error } // scheduler manages global state for the peer. This includes: // - Opening torrents. // - Announcing to the tracker. // - Handshaking incoming connections. // - Initializing outgoing connections. // - Dispatching connections to torrents. // - Pre-empting existing connections when better options are available (TODO). type scheduler struct { pctx core.PeerContext config Config clock clock.Clock torrentArchive storage.TorrentArchive stats tally.Scope handshaker *conn.Handshaker eventLoop *liftedEventLoop listener net.Listener preemptionTick <-chan time.Time emitStatsTick <-chan time.Time // TODO(codyg): We only need this hold on this reference for reloading the scheduler... announceClient announceclient.Client announcer *announcer.Announcer netevents networkevent.Producer torrentlog *torrentlog.Logger logger *zap.SugaredLogger // The following fields orchestrate the stopping of the scheduler. stopOnce sync.Once // Ensures the stop sequence is executed only once. done chan struct{} // Signals all goroutines to exit. wg sync.WaitGroup // Waits for eventLoop and listenLoop to exit. } // schedOverrides defines scheduler fields which may be overrided for testing // purposes. type schedOverrides struct { clock clock.Clock eventLoop eventLoop } type option func(*schedOverrides) func withClock(c clock.Clock) option { return func(o *schedOverrides) { o.clock = c } } func withEventLoop(l eventLoop) option { return func(o *schedOverrides) { o.eventLoop = l } } // newScheduler creates and starts a scheduler. func newScheduler( config Config, ta storage.TorrentArchive, stats tally.Scope, pctx core.PeerContext, announceClient announceclient.Client, netevents networkevent.Producer, options ...option) (*scheduler, error) { config = config.applyDefaults() logger, err := log.New(config.Log, nil) if err != nil { return nil, fmt.Errorf("log: %s", err) } slogger := logger.Sugar() done := make(chan struct{}) stats = stats.Tagged(map[string]string{ "module": "scheduler", }) overrides := schedOverrides{ clock: clock.New(), eventLoop: newEventLoop(), } for _, opt := range options { opt(&overrides) } eventLoop := liftEventLoop(overrides.eventLoop) var preemptionTick <-chan time.Time if !config.DisablePreemption { preemptionTick = overrides.clock.Tick(config.PreemptionInterval) } handshaker, err := conn.NewHandshaker( config.Conn, stats, overrides.clock, netevents, pctx.PeerID, eventLoop, slogger) if err != nil { return nil, fmt.Errorf("conn: %s", err) } tlog, err := torrentlog.New(config.TorrentLog, pctx) if err != nil { return nil, fmt.Errorf("torrentlog: %s", err) } s := &scheduler{ pctx: pctx, config: config, clock: overrides.clock, torrentArchive: ta, stats: stats, handshaker: handshaker, eventLoop: eventLoop, preemptionTick: preemptionTick, emitStatsTick: overrides.clock.Tick(config.EmitStatsInterval), announceClient: announceClient, announcer: announcer.Default(announceClient, eventLoop, overrides.clock, slogger), netevents: netevents, torrentlog: tlog, logger: slogger, done: done, } if config.DisablePreemption { s.log().Warn("Preemption disabled") } if config.ConnState.DisableBlacklist { s.log().Warn("Blacklisting disabled") } return s, nil } // start asynchronously starts all scheduler loops. // // Note: this has been split from the constructor so we can test against an // "unstarted" scheduler in certain cases. func (s *scheduler) start(aq announcequeue.Queue) error { s.log().Infof( "Scheduler starting as peer %s on addr %s:%d", s.pctx.PeerID, s.pctx.IP, s.pctx.Port) l, err := net.Listen("tcp", fmt.Sprintf(":%d", s.pctx.Port)) if err != nil { return err } s.listener = l s.wg.Add(4) go s.runEventLoop(aq) // Careful, this should be the only reference to aq. go s.listenLoop() go s.tickerLoop() go s.announceLoop() return nil } // Stop shuts down the scheduler. func (s *scheduler) Stop() { s.stopOnce.Do(func() { s.log().Info("Stopping scheduler...") close(s.done) s.listener.Close() s.eventLoop.send(shutdownEvent{}) // Waits for all loops to stop. s.wg.Wait() s.torrentlog.Sync() s.log().Info("Scheduler stopped") }) } func (s *scheduler) doDownload(namespace string, d core.Digest) (size int64, err error) { t, err := s.torrentArchive.CreateTorrent(namespace, d) if err != nil { if err == storage.ErrNotFound { return 0, ErrTorrentNotFound } return 0, fmt.Errorf("create torrent: %s", err) } // Buffer size of 1 so sends do not block. errc := make(chan error, 1) if !s.eventLoop.send(newTorrentEvent{namespace, t, errc}) { return 0, ErrSchedulerStopped } return t.Length(), <-errc } // Download downloads the torrent given metainfo. Once the torrent is downloaded, // it will begin seeding asynchronously. func (s *scheduler) Download(namespace string, d core.Digest) error { start := time.Now() size, err := s.doDownload(namespace, d) if err != nil { var errTag string switch err { case ErrTorrentNotFound: errTag = "not_found" case ErrTorrentTimeout: errTag = "timeout" case ErrSchedulerStopped: errTag = "scheduler_stopped" case ErrTorrentRemoved: errTag = "removed" default: errTag = "unknown" } s.stats.Tagged(map[string]string{ "error": errTag, }).Counter("download_errors").Inc(1) s.torrentlog.DownloadFailure(namespace, d, size, err) } else { downloadTime := time.Since(start) recordDownloadTime(s.stats, size, downloadTime) s.torrentlog.DownloadSuccess(namespace, d, size, downloadTime) } return err } // BlacklistSnapshot returns a snapshot of the current connection blacklist. func (s *scheduler) BlacklistSnapshot() ([]connstate.BlacklistedConn, error) { result := make(chan []connstate.BlacklistedConn) if !s.eventLoop.send(blacklistSnapshotEvent{result}) { return nil, ErrSchedulerStopped } return <-result, nil } // RemoveTorrent forcibly stops leeching / seeding torrent for d and removes // the torrent from disk. func (s *scheduler) RemoveTorrent(d core.Digest) error { // Buffer size of 1 so sends do not block. errc := make(chan error, 1) if !s.eventLoop.send(removeTorrentEvent{d, errc}) { return ErrSchedulerStopped } return <-errc } // Probe verifies that the scheduler event loop is running and unblocked. func (s *scheduler) Probe() error { return s.eventLoop.sendTimeout(probeEvent{}, s.config.ProbeTimeout) } func (s *scheduler) runEventLoop(aq announcequeue.Queue) { defer s.wg.Done() s.eventLoop.run(newState(s, aq)) } // listenLoop accepts incoming connections. func (s *scheduler) listenLoop() { defer s.wg.Done() s.log().Infof("Listening on %s", s.listener.Addr().String()) for { nc, err := s.listener.Accept() if err != nil { // TODO Need some way to make this gracefully exit. s.log().Infof("Error accepting new conn, exiting listen loop: %s", err) return } go func() { pc, err := s.handshaker.Accept(nc) if err != nil { s.log().Infof("Error accepting handshake, closing net conn: %s", err) nc.Close() return } s.eventLoop.send(incomingHandshakeEvent{pc}) }() } } // tickerLoop periodically emits various tick events. func (s *scheduler) tickerLoop() { defer s.wg.Done() for { select { case <-s.preemptionTick: s.eventLoop.send(preemptionTickEvent{}) case <-s.emitStatsTick: s.eventLoop.send(emitStatsEvent{}) case <-s.done: return } } } // announceLoop runs the announcer ticker. func (s *scheduler) announceLoop() { defer s.wg.Done() s.announcer.Ticker(s.done) } func (s *scheduler) announce(d core.Digest, h core.InfoHash, complete bool) { peers, err := s.announcer.Announce(d, h, complete) if err != nil { if err != announceclient.ErrDisabled { s.eventLoop.send(announceErrEvent{h, err}) } return } s.eventLoop.send(announceResultEvent{h, peers}) } func (s *scheduler) failIncomingHandshake(pc *conn.PendingConn, err error) { s.log( "peer", pc.PeerID(), "hash", pc.InfoHash()).Infof("Error accepting incoming handshake: %s", err) pc.Close() s.eventLoop.send(failedIncomingHandshakeEvent{pc.PeerID(), pc.InfoHash()}) } // establishIncomingHandshake attempts to establish a pending conn initialized // by a remote peer. Success / failure is communicated via events. func (s *scheduler) establishIncomingHandshake(pc *conn.PendingConn, rb conn.RemoteBitfields) { info, err := s.torrentArchive.Stat(pc.Namespace(), pc.Digest()) if err != nil { s.failIncomingHandshake(pc, fmt.Errorf("torrent stat: %s", err)) return } c, err := s.handshaker.Establish(pc, info, rb) if err != nil { s.failIncomingHandshake(pc, fmt.Errorf("establish handshake: %s", err)) return } s.torrentlog.IncomingConnectionAccept(pc.Digest(), pc.InfoHash(), pc.PeerID()) s.eventLoop.send(incomingConnEvent{pc.Namespace(), c, pc.Bitfield(), info}) } // initializeOutgoingHandshake attempts to initialize a conn to a remote peer. // Success / failure is communicated via events. func (s *scheduler) initializeOutgoingHandshake( p *core.PeerInfo, info *storage.TorrentInfo, rb conn.RemoteBitfields, namespace string) { addr := fmt.Sprintf("%s:%d", p.IP, p.Port) result, err := s.handshaker.Initialize(p.PeerID, addr, info, rb, namespace) if err != nil { s.log( "peer", p.PeerID, "hash", info.InfoHash(), "addr", addr).Infof("Error initializing outgoing handshake: %s", err) s.eventLoop.send(failedOutgoingHandshakeEvent{p.PeerID, info.InfoHash()}) s.torrentlog.OutgoingConnectionReject(info.Digest(), info.InfoHash(), p.PeerID, err) return } s.torrentlog.OutgoingConnectionAccept(info.Digest(), info.InfoHash(), p.PeerID) s.eventLoop.send(outgoingConnEvent{result.Conn, result.Bitfield, info}) } func (s *scheduler) log(args ...interface{}) *zap.SugaredLogger { return s.logger.With(args...) }