func()

in pkg/exporter/probe/flow/flow.go [119:164]


func (h *dynamicLinkFlowHelper) start() error {
	h.done = make(chan struct{})
	ch := make(chan netlink.LinkUpdate)
	links, err := netlink.LinkList()
	if err != nil {
		return fmt.Errorf("%s error list link, err: %w", probeName, err)
	}
	for _, link := range links {
		if !strings.HasPrefix(link.Attrs().Name, h.pattern) {
			continue
		}
		h.tryStartLinkFlow(link)
	}
	go func() {
		if err := netlink.LinkSubscribe(ch, h.done); err != nil {
			log.Errorf("%s error watch link change, err: %v", probeName, err)
			close(h.done)
		}
	}()
	go func() {
		h.lock.Lock()
		defer h.lock.Unlock()
		for {
			select {
			case change := <-ch:
				if !strings.HasSuffix(change.Attrs().Name, h.pattern) {
					break
				}
				switch change.Header.Type {
				case syscall.RTM_NEWLINK:
					link, err := netlink.LinkByIndex(int(change.Index))
					if err != nil {
						log.Errorf("failed get new created link by index %d, name %s, err: %v", change.Index, change.Attrs().Name, err)
						break
					}
					h.tryStartLinkFlow(link)
				case syscall.RTM_DELLINK:
					h.tryStopLinkFlow(change.Attrs().Name, int(change.Index))
				}
			case <-h.done:
				return
			}
		}
	}()
	return nil
}