func()

in plugins/client/kafka/client_sniffer.go [28:48]


func (c *Client) snifferBrokerStatus() {
	ctx, cancel := context.WithCancel(c.ctx)
	defer cancel()
	timeTicker := time.NewTicker(time.Duration(c.RefreshPeriod) * time.Minute)
	for {
		select {
		case <-timeTicker.C:
			brokers := c.client.Brokers()
			if len(brokers) == 0 && c.status == api.Connected {
				c.status = api.Disconnect
				c.notify()
			} else if len(brokers) > 0 && c.status == api.Disconnect {
				c.status = api.Connected
				c.notify()
			}
		case <-ctx.Done():
			timeTicker.Stop()
			return
		}
	}
}