in datahub/client/consumer/offset_manager.py [0:0]
def __commit_offset_task(self):
self._logger.info("Offset commit task start. key: {}".format(self._uniq_key))
while not self._closed:
if self._timer.is_expired():
try:
with self._lock:
self.__sync_offsets()
self.__commit_offsets()
self._timer.reset()
except OffsetResetException as e:
self._logger.warning("CommitOffset fail, subscription offset reset. key:{}. last offset map: {}. {}".format(
self._uniq_key, self._last_offset_map, e))
except InvalidOperationException as e:
self._logger.warning("CommitOffset fail, subscription session invalid. key:{}. {}".format(self._uniq_key, e))
self._coordinator.on_sub_session_changed()
except SubscriptionOfflineException as e:
self._logger.warning("CommitOffset fail, subscription offline. key:{}. {}".format(self._uniq_key, e))
self._coordinator.on_sub_offline()
except ResourceNotFoundException as e:
if "NoSuchSubscription" in e.error_code:
self._logger.warning("CommitOffset fail, subscription deleted. key:{}. {}".format(self._uniq_key, e))
self._coordinator.on_sub_deleted()
else:
self._logger.warning("CommitOffset fail. key:{}. NoSuchSubscription: {}".format(self._uniq_key, e))
except Exception as e:
self._logger.warning("CommitOffset fail. key:{}. {}".format(self._uniq_key, e))
raise e
else:
try:
self._timer.wait_expire(Constant.OFFSET_CHECK_TIMEOUT)
except Exception as e:
self._logger.warning("OffsetCommitTask interrupt occur. key: {}, {}".format(self._uniq_key, e))
break
with self._lock:
self.__sync_offsets()
self.__commit_offsets()
self._logger.info("Offset commit task stop. key: {}".format(self._uniq_key))