in pkg/exporter/probe/flow/flow.go [119:164]
func (h *dynamicLinkFlowHelper) start() error {
h.done = make(chan struct{})
ch := make(chan netlink.LinkUpdate)
links, err := netlink.LinkList()
if err != nil {
return fmt.Errorf("%s error list link, err: %w", probeName, err)
}
for _, link := range links {
if !strings.HasPrefix(link.Attrs().Name, h.pattern) {
continue
}
h.tryStartLinkFlow(link)
}
go func() {
if err := netlink.LinkSubscribe(ch, h.done); err != nil {
log.Errorf("%s error watch link change, err: %v", probeName, err)
close(h.done)
}
}()
go func() {
h.lock.Lock()
defer h.lock.Unlock()
for {
select {
case change := <-ch:
if !strings.HasSuffix(change.Attrs().Name, h.pattern) {
break
}
switch change.Header.Type {
case syscall.RTM_NEWLINK:
link, err := netlink.LinkByIndex(int(change.Index))
if err != nil {
log.Errorf("failed get new created link by index %d, name %s, err: %v", change.Index, change.Attrs().Name, err)
break
}
h.tryStartLinkFlow(link)
case syscall.RTM_DELLINK:
h.tryStopLinkFlow(change.Attrs().Name, int(change.Index))
}
case <-h.done:
return
}
}
}()
return nil
}