def __keep_heartbeat()

in datahub/client/consumer/consumer_heartbeat.py [0:0]


    def __keep_heartbeat(self):
        self._logger.info("ConsumerHeartbeat task start. key: {}, session timeout: {}, heartbeat timeout: {}"
                          .format(self._coordinator.uniq_key, self._session_timeout, self._heartbeat_timeout))
        while not self._closed:
            if self._timer.is_expired():
                self.__heartbeat_once()
                if self._sync_group_meta.get_valid_shards():
                    self._timer.reset(self._heartbeat_timeout)
                else:
                    self._logger.warning("Heartbeat has not assign consumer plan, please wait. key:{}".format(self._coordinator.uniq_key))
                    self._timer.reset(Constant.MIN_HEARTBEAT_INTERVAL_TIMEOUT)
            else:
                try:
                    self._timer.wait_expire()
                except Exception as e:
                    self._logger.warning("ConsumerHeartbeat stop. {}".format(e))
                    break
        self._logger.info("ConsumerHeartbeat task stop. key:{}, sessionTimeoutMs:{}, heartbeatTimeoutMs:{}"
                         .format(self._coordinator.uniq_key, self._session_timeout, self._timer.timeout))