pkg/discovery/routing/router.go (208 lines of code) (raw):

// Initial Copyright (c) 2023 Xenit AB and 2024 The Spegel Authors. // Portions Copyright (c) Microsoft Corporation. // Licensed under the MIT License. package routing import ( "context" "fmt" "net" "sync/atomic" "time" "github.com/azure/peerd/pkg/k8s" "github.com/azure/peerd/pkg/k8s/election" "github.com/azure/peerd/pkg/k8s/events" "github.com/azure/peerd/pkg/peernet" "github.com/dgraph-io/ristretto" cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/p2p/discovery/routing" "github.com/multiformats/go-multiaddr" mc "github.com/multiformats/go-multicodec" mh "github.com/multiformats/go-multihash" "github.com/rs/zerolog" ) const ( MaxRecordAge = 30 * time.Minute negCacheTtl = 500 * time.Millisecond strPeerNotFound = "PEER_NOT_FOUND" ) type router struct { // host is this libp2p host. host host.Host // p2pnet provides clients for downloading content from peers. p2pnet peernet.Network // content is the content discovery service. content *routing.RoutingDiscovery // peerRegistryPort is the port used for the peer registry. peerRegistryPort string // lookupCache is a cache for storing the results of lookups, usually used to store negative results. lookupCache *ristretto.Cache // k8sClient is the k8s client. k8sClient *k8s.ClientSet // active is a flag that indicates if this host is actively discovering content on the network. active atomic.Bool } var _ Router = &router{} // ContentNotFoundError indicates that the content for the given key was not found in the network. type ContentNotFoundError struct { error // key is the key that could not be resolved. key string } // NewRouter creates a new Router. func NewRouter(ctx context.Context, clientset *k8s.ClientSet, hostAddr, peerRegistryPort string) (Router, error) { log := zerolog.Ctx(ctx).With().Str("component", "router").Logger() host, err := newHost(hostAddr) if err != nil { return nil, fmt.Errorf("could not create host: %w", err) } self := fmt.Sprintf("%s/p2p/%s", host.Addrs()[0].String(), host.ID().String()) log.Debug().Str("id", self).Msg("starting p2p router") leaderElection := election.New("peerd-leader-election", clientset) err = leaderElection.RunOrDie(ctx, self) if err != nil { return nil, err } // TODO avtakkar: reconsider the max record age for cached files. Or, ensure that the cached list is periodically advertised. dhtOpts := []dht.Option{dht.Mode(dht.ModeServer), dht.ProtocolPrefix("/peerd"), dht.DisableValues(), dht.MaxRecordAge(MaxRecordAge)} bootstrapPeerOpt := dht.BootstrapPeersFunc(func() []peer.AddrInfo { addr, err := leaderElection.Leader() if err != nil { events.FromContext(ctx).Disconnected() log.Error().Err(err).Msg("could not get leader") return nil } addrInfo, err := peer.AddrInfoFromP2pAddr(addr) if err != nil { log.Error().Err(err).Msg("could not get leader addr info") return nil } defer func() { events.FromContext(ctx).Connected() }() if addrInfo.ID == host.ID() { log.Debug().Msg("bootstrapped as leader") return nil } log.Debug().Str("leader", addrInfo.ID.String()).Msg("leader found") return []peer.AddrInfo{*addrInfo} }) dhtOpts = append(dhtOpts, bootstrapPeerOpt) kdht, err := dht.New(ctx, host, dhtOpts...) if err != nil { return nil, fmt.Errorf("could not create distributed hash table: %w", err) } if err = kdht.Bootstrap(ctx); err != nil { return nil, fmt.Errorf("could not boostrap distributed hash table: %w", err) } rd := routing.NewRoutingDiscovery(kdht) c, err := ristretto.NewCache(&ristretto.Config{NumCounters: 1e7, MaxCost: 1073741824, BufferItems: 64}) if err != nil { return nil, err } n, err := peernet.New(host) if err != nil { return nil, err } return &router{ k8sClient: clientset, p2pnet: n, host: host, content: rd, peerRegistryPort: peerRegistryPort, lookupCache: c, }, nil } // Transport returns the transport. func (r *router) Net() peernet.Network { return r.p2pnet } // Close closes the router. func (r *router) Close() error { return r.host.Close() } // ResolveWithNegativeCacheCallback is like Resolve but it also returns a function callback that can be used to cache that a key could not be resolved. func (r *router) ResolveWithNegativeCacheCallback(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, func(), error) { if val, ok := r.lookupCache.Get(key); ok && val.(string) == strPeerNotFound { // TODO avtakkar: currently only doing a negative cache, this could maybe become a positive cache as well. return nil, nil, ContentNotFoundError{key: key, error: fmt.Errorf("(cached) peer not found for key")} } peerCh, err := r.Resolve(ctx, key, allowSelf, count) return peerCh, func() { r.lookupCache.SetWithTTL(key, strPeerNotFound, 1, negCacheTtl) }, err } // Resolve resolves the given key to a peer address. func (r *router) Resolve(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, error) { log := zerolog.Ctx(ctx).With().Str("selfId", r.host.ID().String()).Str("key", key).Logger() contentId, err := createContentId(key) if err != nil { return nil, err } providersCh := r.content.FindProvidersAsync(ctx, contentId, count) peersCh := make(chan PeerInfo, count) go func() { for info := range providersCh { if !allowSelf && info.ID == r.host.ID() { continue } if len(info.Addrs) != 1 { log.Debug().Msg("expected address list to only contain a single item") continue } v, err := info.Addrs[0].ValueForProtocol(multiaddr.P_IP4) if err != nil { log.Error().Err(err).Str("peer", info.Addrs[0].String()).Msg("could not get IPV4 address") continue } // Combine peer with registry port to create mirror endpoint. peersCh <- PeerInfo{info.ID, fmt.Sprintf("https://%s:%s", v, r.peerRegistryPort)} if r.active.CompareAndSwap(false, true) { er, err := events.NewRecorder(ctx, r.k8sClient) if err != nil { log.Error().Err(err).Msg("failed to create event recorder") } else { er.Active() // Report that p2p is active. } } } }() return peersCh, nil } // Provide advertises the given keys to the network. func (r *router) Provide(ctx context.Context, keys []string) error { zerolog.Ctx(ctx).Trace().Str("host", r.host.ID().String()).Strs("keys", keys).Msg("providing keys") for _, key := range keys { contentId, err := createContentId(key) if err != nil { return err } err = r.content.Provide(ctx, contentId, true) if err != nil { return err } } return nil } // createContentId creates a deterministic content id from the given key. func createContentId(key string) (cid.Cid, error) { pref := cid.Prefix{ Version: 1, Codec: uint64(mc.Raw), MhType: mh.SHA2_256, MhLength: -1, } c, err := pref.Sum([]byte(key)) if err != nil { return cid.Cid{}, err } return c, nil } // newHost creates a new Host from the given address. func newHost(addr string) (host.Host, error) { h, p, err := net.SplitHostPort(addr) if err != nil { return nil, err } hostAddr, err := multiaddr.NewMultiaddr(fmt.Sprintf("/ip4/%s/tcp/%s", h, p)) if err != nil { return nil, fmt.Errorf("could not create host multi address: %w", err) } factory := libp2p.AddrsFactory(func(addrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { for _, addr := range addrs { v, err := addr.ValueForProtocol(multiaddr.P_IP4) if err != nil { continue } if v == "" { continue } if v == "127.0.0.1" { continue } return []multiaddr.Multiaddr{addr} } return nil }) return libp2p.New(libp2p.ListenAddrs(hostAddr), factory) }