in plugins/client/kafka/client_sniffer.go [28:48]
func (c *Client) snifferBrokerStatus() {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
timeTicker := time.NewTicker(time.Duration(c.RefreshPeriod) * time.Minute)
for {
select {
case <-timeTicker.C:
brokers := c.client.Brokers()
if len(brokers) == 0 && c.status == api.Connected {
c.status = api.Disconnect
c.notify()
} else if len(brokers) > 0 && c.status == api.Disconnect {
c.status = api.Connected
c.notify()
}
case <-ctx.Done():
timeTicker.Stop()
return
}
}
}