tracker/peerstore/redis.go (146 lines of code) (raw):

// Copyright (c) 2016-2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package peerstore import ( "errors" "fmt" "strconv" "strings" "github.com/uber/kraken/core" "github.com/uber/kraken/utils/log" "github.com/uber/kraken/utils/randutil" "github.com/andres-erbsen/clock" "github.com/garyburd/redigo/redis" ) func peerSetKey(h core.InfoHash, window int64) string { return fmt.Sprintf("peerset:%s:%d", h.String(), window) } func serializePeer(p *core.PeerInfo) string { var completeBit int if p.Complete { completeBit = 1 } return fmt.Sprintf("%s:%s:%d:%d", p.PeerID.String(), p.IP, p.Port, completeBit) } type peerIdentity struct { peerID core.PeerID ip string port int } func deserializePeer(s string) (id peerIdentity, complete bool, err error) { parts := strings.Split(s, ":") if len(parts) != 4 { return id, false, fmt.Errorf("invalid peer encoding: expected 'pid:ip:port:complete'") } peerID, err := core.NewPeerID(parts[0]) if err != nil { return id, false, fmt.Errorf("parse peer id: %s", err) } ip := parts[1] port, err := strconv.Atoi(parts[2]) if err != nil { return id, false, fmt.Errorf("parse port: %s", err) } id = peerIdentity{peerID, ip, port} complete = parts[3] == "1" return id, complete, nil } // RedisStore is a Store backed by Redis. type RedisStore struct { config RedisConfig pool *redis.Pool clk clock.Clock } // NewRedisStore creates a new RedisStore. func NewRedisStore(config RedisConfig, clk clock.Clock) (*RedisStore, error) { config.applyDefaults() if config.Addr == "" { return nil, errors.New("invalid config: missing addr") } s := &RedisStore{ config: config, pool: &redis.Pool{ Dial: func() (redis.Conn, error) { // TODO Add options return redis.Dial( "tcp", config.Addr, redis.DialConnectTimeout(config.DialTimeout), redis.DialReadTimeout(config.ReadTimeout), redis.DialWriteTimeout(config.WriteTimeout)) }, MaxIdle: config.MaxIdleConns, MaxActive: config.MaxActiveConns, IdleTimeout: config.IdleConnTimeout, Wait: true, }, clk: clk, } // Ensure we can connect to Redis. c, err := s.pool.Dial() if err != nil { return nil, fmt.Errorf("dial redis: %s", err) } c.Close() return s, nil } // Close implements Store. func (s *RedisStore) Close() {} func (s *RedisStore) curPeerSetWindow() int64 { t := s.clk.Now().Unix() return t - (t % int64(s.config.PeerSetWindowSize.Seconds())) } func (s *RedisStore) peerSetWindows() []int64 { cur := s.curPeerSetWindow() ws := make([]int64, s.config.MaxPeerSetWindows) for i := range ws { ws[i] = cur - int64(i)*int64(s.config.PeerSetWindowSize.Seconds()) } return ws } // UpdatePeer writes p to Redis with a TTL. func (s *RedisStore) UpdatePeer(h core.InfoHash, p *core.PeerInfo) error { c := s.pool.Get() defer c.Close() w := s.curPeerSetWindow() expireAt := w + int64(s.config.PeerSetWindowSize.Seconds())*int64(s.config.MaxPeerSetWindows) // Add p to the current window. k := peerSetKey(h, w) if err := c.Send("SADD", k, serializePeer(p)); err != nil { return fmt.Errorf("send SADD: %s", err) } if err := c.Send("EXPIREAT", k, expireAt); err != nil { return fmt.Errorf("send EXPIREAT: %s", err) } if err := c.Flush(); err != nil { return fmt.Errorf("flush: %s", err) } if _, err := c.Receive(); err != nil { return fmt.Errorf("SADD: %s", err) } if _, err := c.Receive(); err != nil { return fmt.Errorf("EXPIREAT: %s", err) } return nil } // GetPeers returns at most n PeerInfos associated with h. func (s *RedisStore) GetPeers(h core.InfoHash, n int) ([]*core.PeerInfo, error) { c := s.pool.Get() defer c.Close() // Try to sample n peers from each window in randomized order until we have // collected n distinct peers. This achieves random sampling across multiple // windows. // TODO(codyg): One limitation of random window sampling is we're no longer // guaranteed to include the latest completion bits. A simple way to mitigate // this is to decrease the number of windows. windows := s.peerSetWindows() randutil.ShuffleInt64s(windows) // Eliminate duplicates from other windows and collapses complete bits. selected := make(map[peerIdentity]bool) for i := 0; len(selected) < n && i < len(windows); i++ { k := peerSetKey(h, windows[i]) result, err := redis.Strings(c.Do("SRANDMEMBER", k, n-len(selected))) if err == redis.ErrNil { continue } else if err != nil { return nil, err } for _, s := range result { id, complete, err := deserializePeer(s) if err != nil { log.Errorf("Error deserializing peer %q: %s", s, err) continue } selected[id] = selected[id] || complete } } var peers []*core.PeerInfo for id, complete := range selected { p := core.NewPeerInfo(id.peerID, id.ip, id.port, false, complete) peers = append(peers, p) } return peers, nil }