in pkg/discovery/routing/router.go [71:146]
func NewRouter(ctx context.Context, clientset *k8s.ClientSet, hostAddr, peerRegistryPort string) (Router, error) {
log := zerolog.Ctx(ctx).With().Str("component", "router").Logger()
host, err := newHost(hostAddr)
if err != nil {
return nil, fmt.Errorf("could not create host: %w", err)
}
self := fmt.Sprintf("%s/p2p/%s", host.Addrs()[0].String(), host.ID().String())
log.Debug().Str("id", self).Msg("starting p2p router")
leaderElection := election.New("peerd-leader-election", clientset)
err = leaderElection.RunOrDie(ctx, self)
if err != nil {
return nil, err
}
// TODO avtakkar: reconsider the max record age for cached files. Or, ensure that the cached list is periodically advertised.
dhtOpts := []dht.Option{dht.Mode(dht.ModeServer), dht.ProtocolPrefix("/peerd"), dht.DisableValues(), dht.MaxRecordAge(MaxRecordAge)}
bootstrapPeerOpt := dht.BootstrapPeersFunc(func() []peer.AddrInfo {
addr, err := leaderElection.Leader()
if err != nil {
events.FromContext(ctx).Disconnected()
log.Error().Err(err).Msg("could not get leader")
return nil
}
addrInfo, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil {
log.Error().Err(err).Msg("could not get leader addr info")
return nil
}
defer func() {
events.FromContext(ctx).Connected()
}()
if addrInfo.ID == host.ID() {
log.Debug().Msg("bootstrapped as leader")
return nil
}
log.Debug().Str("leader", addrInfo.ID.String()).Msg("leader found")
return []peer.AddrInfo{*addrInfo}
})
dhtOpts = append(dhtOpts, bootstrapPeerOpt)
kdht, err := dht.New(ctx, host, dhtOpts...)
if err != nil {
return nil, fmt.Errorf("could not create distributed hash table: %w", err)
}
if err = kdht.Bootstrap(ctx); err != nil {
return nil, fmt.Errorf("could not boostrap distributed hash table: %w", err)
}
rd := routing.NewRoutingDiscovery(kdht)
c, err := ristretto.NewCache(&ristretto.Config{NumCounters: 1e7, MaxCost: 1073741824, BufferItems: 64})
if err != nil {
return nil, err
}
n, err := peernet.New(host)
if err != nil {
return nil, err
}
return &router{
k8sClient: clientset,
p2pnet: n,
host: host,
content: rd,
peerRegistryPort: peerRegistryPort,
lookupCache: c,
}, nil
}