lib/torrent/scheduler/dispatch/piecerequest/manager.go (192 lines of code) (raw):
// Copyright (c) 2016-2019 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package piecerequest
import (
"fmt"
"sort"
"sync"
"time"
"github.com/uber/kraken/core"
"github.com/uber/kraken/utils/syncutil"
"github.com/andres-erbsen/clock"
"github.com/willf/bitset"
)
// Status enumerates possible statuses of a Request.
type Status int
const (
// StatusPending denotes a valid request which is still in-flight.
StatusPending Status = iota
// StatusExpired denotes an in-flight request which has timed out on our end.
StatusExpired
// StatusUnsent denotes an unsent request that is safe to retry to the same peer.
StatusUnsent
// StatusInvalid denotes a completed request that resulted in an invalid payload.
StatusInvalid
)
// Request represents a piece request to peer.
type Request struct {
Piece int
PeerID core.PeerID
Status Status
sentAt time.Time
}
// Manager encapsulates thread-safe piece request bookkeeping. It is not responsible
// for sending nor receiving pieces in any way.
type Manager struct {
sync.RWMutex
// requests and requestsByPeer holds the same data, just indexed differently.
requests map[int][]*Request
requestsByPeer map[core.PeerID]map[int]*Request
clock clock.Clock
timeout time.Duration
policy pieceSelectionPolicy
pipelineLimit int
}
// NewManager creates a new Manager.
func NewManager(
clk clock.Clock,
timeout time.Duration,
policy string,
pipelineLimit int) (*Manager, error) {
m := &Manager{
requests: make(map[int][]*Request),
requestsByPeer: make(map[core.PeerID]map[int]*Request),
clock: clk,
timeout: timeout,
pipelineLimit: pipelineLimit,
}
switch policy {
case DefaultPolicy:
m.policy = newDefaultPolicy()
case RarestFirstPolicy:
m.policy = newRarestFirstPolicy()
default:
return nil, fmt.Errorf("invalid piece selection policy: %s", policy)
}
return m, nil
}
// ReservePieces selects the next piece(s) to be requested from given peer.
// It selects peers on a rarity-first basis using numPeersByPiece.
// If allowDuplicates is set, may return pieces which have already been
// reserved under other peers.
func (m *Manager) ReservePieces(
peerID core.PeerID,
candidates *bitset.BitSet,
numPeersByPiece syncutil.Counters,
allowDuplicates bool) ([]int, error) {
m.Lock()
defer m.Unlock()
quota := m.requestQuota(peerID)
if quota <= 0 {
return nil, nil
}
valid := func(i int) bool { return m.validRequest(peerID, i, allowDuplicates) }
pieces, err := m.policy.selectPieces(quota, valid, candidates, numPeersByPiece)
if err != nil {
return nil, err
}
// Set as pending in requests map.
for _, i := range pieces {
r := &Request{
Piece: i,
PeerID: peerID,
Status: StatusPending,
sentAt: m.clock.Now(),
}
m.requests[i] = append(m.requests[i], r)
if _, ok := m.requestsByPeer[peerID]; !ok {
m.requestsByPeer[peerID] = make(map[int]*Request)
}
m.requestsByPeer[peerID][i] = r
}
return pieces, nil
}
// MarkUnsent marks the piece request for piece i as unsent.
func (m *Manager) MarkUnsent(peerID core.PeerID, i int) {
m.markStatus(peerID, i, StatusUnsent)
}
// MarkInvalid marks the piece request for piece i as invalid.
func (m *Manager) MarkInvalid(peerID core.PeerID, i int) {
m.markStatus(peerID, i, StatusInvalid)
}
// Clear deletes the piece request for piece i. Should be used for freeing up
// unneeded request bookkeeping.
func (m *Manager) Clear(i int) {
m.Lock()
defer m.Unlock()
delete(m.requests, i)
for peerID, pm := range m.requestsByPeer {
delete(pm, i)
if len(pm) == 0 {
delete(m.requestsByPeer, peerID)
}
}
}
// PendingPieces returns the pieces for all pending requests to peerID in sorted
// order. Intended primarily for testing purposes.
func (m *Manager) PendingPieces(peerID core.PeerID) []int {
m.RLock()
defer m.RUnlock()
var pieces []int
for i, r := range m.requestsByPeer[peerID] {
if r.Status == StatusPending {
pieces = append(pieces, i)
}
}
sort.Ints(pieces)
return pieces
}
// ClearPeer deletes all piece requests for peerID.
func (m *Manager) ClearPeer(peerID core.PeerID) {
m.Lock()
defer m.Unlock()
delete(m.requestsByPeer, peerID)
for i, rs := range m.requests {
for j, r := range rs {
if r.PeerID == peerID {
// Eject request.
rs[j] = rs[len(rs)-1]
m.requests[i] = rs[:len(rs)-1]
break
}
}
}
}
// GetFailedRequests returns a copy of all failed piece requests.
func (m *Manager) GetFailedRequests() []Request {
m.RLock()
defer m.RUnlock()
var failed []Request
for _, rs := range m.requests {
for _, r := range rs {
status := r.Status
if status == StatusPending && m.expired(r) {
status = StatusExpired
}
if status != StatusPending {
failed = append(failed, Request{
Piece: r.Piece,
PeerID: r.PeerID,
Status: status,
})
}
}
}
return failed
}
func (m *Manager) validRequest(peerID core.PeerID, i int, allowDuplicates bool) bool {
for _, r := range m.requests[i] {
if r.Status == StatusPending && !m.expired(r) {
if r.PeerID == peerID {
return false
}
if !allowDuplicates {
return false
}
}
}
return true
}
func (m *Manager) requestQuota(peerID core.PeerID) int {
quota := m.pipelineLimit
pm, ok := m.requestsByPeer[peerID]
if !ok {
return quota
}
for _, r := range pm {
if r.Status == StatusPending && !m.expired(r) {
quota--
if quota == 0 {
break
}
}
}
return quota
}
func (m *Manager) expired(r *Request) bool {
expiresAt := r.sentAt.Add(m.timeout)
return m.clock.Now().After(expiresAt)
}
func (m *Manager) markStatus(peerID core.PeerID, i int, s Status) {
m.Lock()
defer m.Unlock()
for _, r := range m.requests[i] {
if r.PeerID == peerID {
r.Status = s
}
}
}