func()

in pkg/client/mq/kafka_facade.go [154:194]


func (f *KafkaConsumerFacade) checkConsumerIsAlive(ctx context.Context, key string, checkUrl string) {
	defer f.wg.Done()

	ticker := time.NewTicker(15 * time.Second)
	for {
		select {
		case <-f.done:
			ticker.Stop()
		case <-ticker.C:
			lastCheck := 0
			for i := 0; i < 5; i++ {
				err := func() error {
					req, err := http.NewRequest(http.MethodGet, checkUrl, bytes.NewReader([]byte{}))
					if err == nil {
						resp, err := f.httpClient.Do(req)
						if err == nil {
							defer resp.Body.Close()
							lastCheck = resp.StatusCode
							if resp.StatusCode != http.StatusOK {
								return perrors.New("failed check consumer alive or not with status code " + strconv.Itoa(resp.StatusCode))
							}
							return nil
						}
					}
					return perrors.New("failed to check consumer alive due to: " + err.Error())
				}()
				if err != nil {
					time.Sleep(10 * time.Millisecond)
				} else {
					break
				}
			}

			if lastCheck != http.StatusOK {
				f.consumerManager[key]()
				delete(f.consumerManager, key)
			}

		}
	}
}