tracker/peerstore/local.go (180 lines of code) (raw):
// Copyright (c) 2016-2020 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package peerstore
import (
"math/rand"
"sync"
"time"
"github.com/andres-erbsen/clock"
"github.com/uber/kraken/core"
_ "github.com/uber/kraken/utils/randutil" // For seeded global rand.
)
const (
_cleanupExpiredPeerEntriesInterval = 5 * time.Minute
_cleanupExpiredPeerGroupsInterval = time.Hour
)
// LocalStore is an in-memory Store implementation.
type LocalStore struct {
config LocalConfig
clk clock.Clock
cleanupExpiredPeerEntriesTicker *time.Ticker
cleanupExpiredPeerGroupsTicker *time.Ticker
stopOnce sync.Once
stop chan struct{}
mu sync.RWMutex
peerGroups map[core.InfoHash]*peerGroup
}
type peerGroup struct {
mu sync.RWMutex
// Same peerEntry references in both, just indexed differently.
peerList []*peerEntry
peerMap map[core.PeerID]*peerEntry
lastExpiresAt time.Time
deleted bool
}
type peerEntry struct {
id core.PeerID
ip string
port int
complete bool
expiresAt time.Time
}
// NewLocalStore creates a new LocalStore.
func NewLocalStore(config LocalConfig, clk clock.Clock) *LocalStore {
config.applyDefaults()
s := &LocalStore{
config: config,
clk: clk,
cleanupExpiredPeerEntriesTicker: time.NewTicker(_cleanupExpiredPeerEntriesInterval),
cleanupExpiredPeerGroupsTicker: time.NewTicker(_cleanupExpiredPeerGroupsInterval),
stop: make(chan struct{}),
peerGroups: make(map[core.InfoHash]*peerGroup),
}
go s.cleanupTask()
return s
}
// Close implements Store.
func (s *LocalStore) Close() {
s.stopOnce.Do(func() { close(s.stop) })
}
// GetPeers implements Store.
func (s *LocalStore) GetPeers(h core.InfoHash, n int) ([]*core.PeerInfo, error) {
s.mu.RLock()
g, ok := s.peerGroups[h]
s.mu.RUnlock()
if !ok {
return nil, nil
}
g.mu.RLock()
defer g.mu.RUnlock()
if len(g.peerList) < n {
n = len(g.peerList)
}
if n <= 0 {
return nil, nil
}
result := make([]*core.PeerInfo, 0, n)
// Select n random indexes.
indexes := rand.Perm(len(g.peerList))
indexes = indexes[:n]
for _, i := range indexes {
// Note, we elect to return slightly expired entries rather than iterate
// until we find n valid entries.
e := g.peerList[i]
result = append(result, core.NewPeerInfo(e.id, e.ip, e.port, false /* origin */, e.complete))
}
return result, nil
}
// UpdatePeer implements Store.
func (s *LocalStore) UpdatePeer(h core.InfoHash, p *core.PeerInfo) error {
g := s.getOrInitLockedPeerGroup(h)
defer g.mu.Unlock()
e, ok := g.peerMap[p.PeerID]
if !ok {
e = &peerEntry{}
g.peerList = append(g.peerList, e)
g.peerMap[p.PeerID] = e
}
e.id = p.PeerID
e.ip = p.IP
e.port = p.Port
e.complete = p.Complete
e.expiresAt = s.clk.Now().Add(s.config.TTL)
// Allows cleanupExpiredPeerGroups to quickly determine when the last
// peerEntry expires.
g.lastExpiresAt = e.expiresAt
return nil
}
func (s *LocalStore) getOrInitLockedPeerGroup(h core.InfoHash) *peerGroup {
// We must take care to handle a race condition against
// cleanupExpiredPeerGroups. Consider two goroutines, A and B, where A
// executes getOrInitLockedPeerGroup and B executes
// cleanupExpiredPeerGroups:
//
// A: locks s.mu, reads g from s.peerGroups, unlocks s.mu
// B: locks s.mu, locks g.mu, deletes g from s.peerGroups, unlocks g.mu
// A: locks g.mu
//
// At this point, A is holding onto a peerGroup reference which has been
// deleted from the peerGroups map, and thus has no choice but to attempt to
// reload a new peerGroup. Since the cleanup interval is quite large, it is
// *extremely* unlikely this for-loop will execute more than twice.
for {
s.mu.Lock()
g, ok := s.peerGroups[h]
if !ok {
g = &peerGroup{
peerMap: make(map[core.PeerID]*peerEntry),
lastExpiresAt: s.clk.Now().Add(s.config.TTL),
}
s.peerGroups[h] = g
}
s.mu.Unlock()
g.mu.Lock()
if g.deleted {
g.mu.Unlock()
continue
}
return g
}
}
func (s *LocalStore) cleanupTask() {
for {
select {
case <-s.cleanupExpiredPeerEntriesTicker.C:
s.cleanupExpiredPeerEntries()
case <-s.cleanupExpiredPeerGroupsTicker.C:
s.cleanupExpiredPeerGroups()
case <-s.stop:
return
}
}
}
func (s *LocalStore) cleanupExpiredPeerEntries() {
s.mu.RLock()
groups := make([]*peerGroup, 0, len(s.peerGroups))
for _, g := range s.peerGroups {
groups = append(groups, g)
}
s.mu.RUnlock()
for _, g := range groups {
var expired []int
g.mu.RLock()
for i, e := range g.peerList {
if s.clk.Now().After(e.expiresAt) {
expired = append(expired, i)
}
}
g.mu.RUnlock()
if len(expired) == 0 {
// Fast path -- no need to acquire a write lock if there are no
// expired entries.
continue
}
g.mu.Lock()
for j := len(expired) - 1; j >= 0; j-- {
// Loop over expired indexes in reverse orders to perform fast slice
// element removal.
i := expired[j]
if i >= len(g.peerList) {
// Technically we're the only goroutine deleting peer entries,
// but let's play it safe.
continue
}
e := g.peerList[i]
// Must re-check the expiresAt timestamp in case an update occurred
// before we could acquire the write lock.
if s.clk.Now().Before(e.expiresAt) {
continue
}
// Remove the expired index.
g.peerList[i] = g.peerList[len(g.peerList)-1]
g.peerList = g.peerList[:len(g.peerList)-1]
delete(g.peerMap, e.id)
}
g.mu.Unlock()
}
}
func (s *LocalStore) cleanupExpiredPeerGroups() {
s.mu.Lock()
defer s.mu.Unlock()
for h, g := range s.peerGroups {
g.mu.RLock()
valid := s.clk.Now().Before(g.lastExpiresAt)
g.mu.RUnlock()
if valid {
// Fast path -- no need to acquire a write lock if the group is
// still valid.
continue
}
g.mu.Lock()
// Must re-check the lastExpiresAt timestamp in case an update
// occurred before we could acquire the write lock.
if s.clk.Now().After(g.lastExpiresAt) {
delete(s.peerGroups, h)
g.deleted = true
}
g.mu.Unlock()
}
}