def __commit_offset_task()

in datahub/client/consumer/offset_manager.py [0:0]


    def __commit_offset_task(self):
        self._logger.info("Offset commit task start. key: {}".format(self._uniq_key))
        while not self._closed:
            if self._timer.is_expired():
                try:
                    with self._lock:
                        self.__sync_offsets()
                        self.__commit_offsets()
                        self._timer.reset()
                except OffsetResetException as e:
                    self._logger.warning("CommitOffset fail, subscription offset reset. key:{}. last offset map: {}. {}".format(
                        self._uniq_key, self._last_offset_map, e))
                except InvalidOperationException as e:
                    self._logger.warning("CommitOffset fail, subscription session invalid. key:{}. {}".format(self._uniq_key, e))
                    self._coordinator.on_sub_session_changed()
                except SubscriptionOfflineException as e:
                    self._logger.warning("CommitOffset fail, subscription offline. key:{}. {}".format(self._uniq_key, e))
                    self._coordinator.on_sub_offline()
                except ResourceNotFoundException as e:
                    if "NoSuchSubscription" in e.error_code:
                        self._logger.warning("CommitOffset fail, subscription deleted. key:{}. {}".format(self._uniq_key, e))
                        self._coordinator.on_sub_deleted()
                    else:
                        self._logger.warning("CommitOffset fail. key:{}. NoSuchSubscription: {}".format(self._uniq_key, e))
                except Exception as e:
                    self._logger.warning("CommitOffset fail. key:{}. {}".format(self._uniq_key, e))
                    raise e
            else:
                try:
                    self._timer.wait_expire(Constant.OFFSET_CHECK_TIMEOUT)
                except Exception as e:
                    self._logger.warning("OffsetCommitTask interrupt occur. key: {}, {}".format(self._uniq_key, e))
                    break
        with self._lock:
            self.__sync_offsets()
            self.__commit_offsets()
        self._logger.info("Offset commit task stop. key: {}".format(self._uniq_key))