in pkg/client/mq/kafka_facade.go [154:194]
func (f *KafkaConsumerFacade) checkConsumerIsAlive(ctx context.Context, key string, checkUrl string) {
defer f.wg.Done()
ticker := time.NewTicker(15 * time.Second)
for {
select {
case <-f.done:
ticker.Stop()
case <-ticker.C:
lastCheck := 0
for i := 0; i < 5; i++ {
err := func() error {
req, err := http.NewRequest(http.MethodGet, checkUrl, bytes.NewReader([]byte{}))
if err == nil {
resp, err := f.httpClient.Do(req)
if err == nil {
defer resp.Body.Close()
lastCheck = resp.StatusCode
if resp.StatusCode != http.StatusOK {
return perrors.New("failed check consumer alive or not with status code " + strconv.Itoa(resp.StatusCode))
}
return nil
}
}
return perrors.New("failed to check consumer alive due to: " + err.Error())
}()
if err != nil {
time.Sleep(10 * time.Millisecond)
} else {
break
}
}
if lastCheck != http.StatusOK {
f.consumerManager[key]()
delete(f.consumerManager, key)
}
}
}
}