in consumer_group.go [994:1065]
func (s *consumerGroupSession) heartbeatLoop() {
defer close(s.hbDead)
defer s.cancel() // trigger the end of the session on exit
defer func() {
Logger.Printf(
"consumergroup/session/%s/%d heartbeat loop stopped\n",
s.MemberID(), s.GenerationID())
}()
pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
defer pause.Stop()
retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff)
defer retryBackoff.Stop()
retries := s.parent.config.Metadata.Retry.Max
for {
coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
if err != nil {
if retries <= 0 {
s.parent.handleError(err, "", -1)
return
}
retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff)
select {
case <-s.hbDying:
return
case <-retryBackoff.C:
retries--
}
continue
}
resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
if err != nil {
_ = coordinator.Close()
if retries <= 0 {
s.parent.handleError(err, "", -1)
return
}
retries--
continue
}
switch resp.Err {
case ErrNoError:
retries = s.parent.config.Metadata.Retry.Max
case ErrRebalanceInProgress:
retries = s.parent.config.Metadata.Retry.Max
s.cancel()
case ErrUnknownMemberId, ErrIllegalGeneration:
return
case ErrFencedInstancedId:
if s.parent.groupInstanceId != nil {
Logger.Printf("JoinGroup failed: group instance id %s has been fenced\n", *s.parent.groupInstanceId)
}
s.parent.handleError(resp.Err, "", -1)
return
default:
s.parent.handleError(resp.Err, "", -1)
return
}
select {
case <-pause.C:
case <-s.hbDying:
return
}
}
}