cns/nodesubnet/ip_fetcher.go (84 lines of code) (raw):

package nodesubnet import ( "context" "log" "net/netip" "time" "github.com/Azure/azure-container-networking/nmagent" "github.com/Azure/azure-container-networking/refresh" "github.com/pkg/errors" ) const ( // Default minimum time between secondary IP fetches DefaultMinRefreshInterval = 4 * time.Second // Default maximum time between secondary IP fetches DefaultMaxRefreshInterval = 1024 * time.Second ) var ErrRefreshSkipped = errors.New("refresh skipped due to throttling") // InterfaceRetriever is an interface is implemented by the NMAgent Client, and also a mock client for testing. type InterfaceRetriever interface { GetInterfaceIPInfo(ctx context.Context) (nmagent.Interfaces, error) } // IPConsumer is an interface implemented by whoever consumes the secondary IPs fetched in nodesubnet type IPConsumer interface { UpdateIPsForNodeSubnet([]netip.Addr) error } // IPFetcher fetches secondary IPs from NMAgent at regular intervals. The // interval will vary within the range of minRefreshInterval and // maxRefreshInterval. When no diff is observed after a fetch, the interval // doubles (subject to the maximum interval). When a diff is observed, the // interval resets to the minimum. type IPFetcher struct { // Node subnet config intfFetcherClient InterfaceRetriever consumer IPConsumer fetcher *refresh.Fetcher[nmagent.Interfaces] } // NewIPFetcher creates a new IPFetcher. If minInterval is 0, it will default to 4 seconds. // If maxInterval is 0, it will default to 1024 seconds (or minInterval, if it is higher). func NewIPFetcher( client InterfaceRetriever, consumer IPConsumer, minInterval time.Duration, maxInterval time.Duration, logger refresh.Logger, ) *IPFetcher { if minInterval == 0 { minInterval = DefaultMinRefreshInterval } if maxInterval == 0 { maxInterval = DefaultMaxRefreshInterval } maxInterval = max(maxInterval, minInterval) newIPFetcher := &IPFetcher{ intfFetcherClient: client, consumer: consumer, fetcher: nil, } fetcher := refresh.NewFetcher[nmagent.Interfaces](client.GetInterfaceIPInfo, minInterval, maxInterval, newIPFetcher.ProcessInterfaces, logger) newIPFetcher.fetcher = fetcher return newIPFetcher } // Start the IPFetcher. func (c *IPFetcher) Start(ctx context.Context) { c.fetcher.Start(ctx) } // Fetch IPs from NMAgent and pass to the consumer func (c *IPFetcher) ProcessInterfaces(response nmagent.Interfaces) error { if len(response.Entries) == 0 { return errors.New("no interfaces found in response from NMAgent") } _, secondaryIPs := flattenIPListFromResponse(&response) err := c.consumer.UpdateIPsForNodeSubnet(secondaryIPs) if err != nil { return errors.Wrap(err, "updating secondary IPs") } return nil } // Get the list of secondary IPs from fetched Interfaces func flattenIPListFromResponse(resp *nmagent.Interfaces) (primary netip.Addr, secondaryIPs []netip.Addr) { var primaryIP netip.Addr // For each interface... for _, intf := range resp.Entries { if !intf.IsPrimary { continue } // For each subnet on the interface... for _, s := range intf.InterfaceSubnets { addressCount := 0 // For each address in the subnet... for _, a := range s.IPAddress { // Primary addresses are reserved for the host. if a.IsPrimary { primaryIP = netip.Addr(a.Address) continue } secondaryIPs = append(secondaryIPs, netip.Addr(a.Address)) addressCount++ } log.Printf("Got %d addresses from subnet %s", addressCount, s.Prefix) } } return primaryIP, secondaryIPs }