in datahub/client/consumer/consumer_heartbeat.py [0:0]
def __keep_heartbeat(self):
self._logger.info("ConsumerHeartbeat task start. key: {}, session timeout: {}, heartbeat timeout: {}"
.format(self._coordinator.uniq_key, self._session_timeout, self._heartbeat_timeout))
while not self._closed:
if self._timer.is_expired():
self.__heartbeat_once()
if self._sync_group_meta.get_valid_shards():
self._timer.reset(self._heartbeat_timeout)
else:
self._logger.warning("Heartbeat has not assign consumer plan, please wait. key:{}".format(self._coordinator.uniq_key))
self._timer.reset(Constant.MIN_HEARTBEAT_INTERVAL_TIMEOUT)
else:
try:
self._timer.wait_expire()
except Exception as e:
self._logger.warning("ConsumerHeartbeat stop. {}".format(e))
break
self._logger.info("ConsumerHeartbeat task stop. key:{}, sessionTimeoutMs:{}, heartbeatTimeoutMs:{}"
.format(self._coordinator.uniq_key, self._session_timeout, self._timer.timeout))