in pkg/discovery/routing/router.go [172:214]
func (r *router) Resolve(ctx context.Context, key string, allowSelf bool, count int) (<-chan PeerInfo, error) {
log := zerolog.Ctx(ctx).With().Str("selfId", r.host.ID().String()).Str("key", key).Logger()
contentId, err := createContentId(key)
if err != nil {
return nil, err
}
providersCh := r.content.FindProvidersAsync(ctx, contentId, count)
peersCh := make(chan PeerInfo, count)
go func() {
for info := range providersCh {
if !allowSelf && info.ID == r.host.ID() {
continue
}
if len(info.Addrs) != 1 {
log.Debug().Msg("expected address list to only contain a single item")
continue
}
v, err := info.Addrs[0].ValueForProtocol(multiaddr.P_IP4)
if err != nil {
log.Error().Err(err).Str("peer", info.Addrs[0].String()).Msg("could not get IPV4 address")
continue
}
// Combine peer with registry port to create mirror endpoint.
peersCh <- PeerInfo{info.ID, fmt.Sprintf("https://%s:%s", v, r.peerRegistryPort)}
if r.active.CompareAndSwap(false, true) {
er, err := events.NewRecorder(ctx, r.k8sClient)
if err != nil {
log.Error().Err(err).Msg("failed to create event recorder")
} else {
er.Active() // Report that p2p is active.
}
}
}
}()
return peersCh, nil
}