in datahub/client/consumer/offset_manager.py [0:0]
def __commit_offsets(self):
try:
if len(self._last_offset_map) > 0:
self._coordinator.meta_data.datahub_client.update_subscription_offset(
self._coordinator.project_name,
self._coordinator.topic_name,
self._coordinator.sub_id,
self._last_offset_map
)
self._logger.info("Commit offset success. key: {}, min offset = {}".format(self._uniq_key, self.__get_min_timestamp()))
self._last_offset_map.clear()
except DatahubException as e:
self._logger.warning("Commit offset fail. key: {}, min offset = {}, DatahubException: {}".format(self._uniq_key, self.__get_min_timestamp(), e))
raise e
except Exception as e:
self._logger.warning("Commit offset fail. key: {}, min offset = {}, {}".format(self._uniq_key, self.__get_min_timestamp(), e))
raise e