lib/torrent/scheduler/state.go (121 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package scheduler
import (
"errors"
"fmt"
"github.com/uber/kraken/core"
"github.com/uber/kraken/lib/torrent/networkevent"
"github.com/uber/kraken/lib/torrent/scheduler/announcequeue"
"github.com/uber/kraken/lib/torrent/scheduler/conn"
"github.com/uber/kraken/lib/torrent/scheduler/connstate"
"github.com/uber/kraken/lib/torrent/scheduler/dispatch"
"github.com/uber/kraken/lib/torrent/storage"
"go.uber.org/zap"
"github.com/willf/bitset"
)
// torrentControl bundles torrent control structures.
type torrentControl struct {
namespace string
dispatcher *dispatch.Dispatcher
errors []chan error
localRequest bool
}
// state is a superset of scheduler, which includes protected state which can
// only be accessed from the event loop. state is free to access scheduler fields
// and methods, however scheduler has no reference to state.
//
// Any network I/O, such as opening connections, does not belong at the state
// level. These operations should be defined as scheduler methods, and executed
// from a separate goroutine when calling from the event loop. Results from I/O
// may transform state by sending events into the event loop.
type state struct {
sched *scheduler
// Protected state.
torrentControls map[core.InfoHash]*torrentControl
conns *connstate.State
announceQueue announcequeue.Queue
}
func newState(s *scheduler, aq announcequeue.Queue) *state {
return &state{
sched: s,
torrentControls: make(map[core.InfoHash]*torrentControl),
conns: connstate.New(
s.config.ConnState, s.clock, s.pctx.PeerID, s.netevents, s.logger),
announceQueue: aq,
}
}
// addTorrent initializes a new torrentControl for t. Overwrites any existing
// torrentControl for t, so callers should check if one exists first.
func (s *state) addTorrent(
namespace string, t storage.Torrent, localRequest bool) (*torrentControl, error) {
d, err := dispatch.New(
s.sched.config.Dispatch,
s.sched.stats,
s.sched.clock,
s.sched.netevents,
s.sched.eventLoop,
s.sched.pctx.PeerID,
t,
s.sched.logger,
s.sched.torrentlog)
if err != nil {
return nil, fmt.Errorf("new dispatcher: %s", err)
}
ctrl := &torrentControl{
namespace: namespace,
dispatcher: d,
localRequest: localRequest,
}
s.announceQueue.Add(t.InfoHash())
s.sched.netevents.Produce(networkevent.AddTorrentEvent(
t.InfoHash(),
s.sched.pctx.PeerID,
t.Bitfield(),
s.sched.config.ConnState.MaxOpenConnectionsPerTorrent))
s.torrentControls[t.InfoHash()] = ctrl
return ctrl, nil
}
// removeTorrent tears down the torrentControl associated with h, sending err to
// all clients waiting on this torrent.
func (s *state) removeTorrent(h core.InfoHash, err error) {
ctrl, ok := s.torrentControls[h]
if !ok {
return
}
if !ctrl.dispatcher.Complete() {
ctrl.dispatcher.TearDown()
s.announceQueue.Eject(h)
for _, errc := range ctrl.errors {
errc <- err
}
s.sched.netevents.Produce(networkevent.TorrentCancelledEvent(h, s.sched.pctx.PeerID))
if err := s.sched.torrentArchive.DeleteTorrent(ctrl.dispatcher.Digest()); err != nil {
s.sched.log().Errorf("Error deleting torrent from archive: %s", err)
}
}
delete(s.torrentControls, h)
}
// addOutgoingConn adds a conn, initialized by us, to state. The conn must already
// be in a pending state, and the torrent control must already be initialized.
func (s *state) addOutgoingConn(c *conn.Conn, b *bitset.BitSet, info *storage.TorrentInfo) error {
if err := s.conns.MovePendingToActive(c); err != nil {
return fmt.Errorf("move pending to active: %s", err)
}
c.Start()
ctrl, ok := s.torrentControls[info.InfoHash()]
if !ok {
return errors.New("torrent controls must be created before sending handshake")
}
if err := ctrl.dispatcher.AddPeer(c.PeerID(), c.IsPeerOrigin(), b, c); err != nil {
return fmt.Errorf("add conn to dispatcher: %s", err)
}
return nil
}
// addIncomingConn adds a conn, initialized by a remote peer, to state. The conn
// must already be in a pending state. Initializes a torrent control if not
// present.
func (s *state) addIncomingConn(
namespace string, c *conn.Conn, b *bitset.BitSet, info *storage.TorrentInfo) error {
if err := s.conns.MovePendingToActive(c); err != nil {
return fmt.Errorf("move pending to active: %s", err)
}
c.Start()
ctrl, ok := s.torrentControls[info.InfoHash()]
if !ok {
t, err := s.sched.torrentArchive.GetTorrent(namespace, info.Digest())
if err != nil {
return fmt.Errorf("get torrent: %s", err)
}
ctrl, err = s.addTorrent(namespace, t, false)
if err != nil {
return err
}
}
if err := ctrl.dispatcher.AddPeer(c.PeerID(), c.IsPeerOrigin(), b, c); err != nil {
return fmt.Errorf("add conn to dispatcher: %s", err)
}
return nil
}
func (s *state) log(args ...interface{}) *zap.SugaredLogger {
return s.sched.log(args...)
}