in datahub/client/consumer/consumer_coordinator.py [0:0]
def __sync_group(self):
if self._sync_group_meta and self._sync_group_meta.need_sync_group():
release = list(self._sync_group_meta.release_shards)
read_end = list(self._sync_group_meta.read_end_shards)
try:
self._meta_data.datahub_client.sync_group(
self._project_name,
self._topic_name,
self._sub_id,
self._consumer_id,
self._version_id,
release,
read_end
)
self._sync_group_meta.clear_shard_release()
self._logger.debug("SyncGroup success. key: {}, release: {}, read end: {}"
.format(self._uniq_key, release, read_end))
except DatahubException as e:
self._logger.warning("SyncGroup fail. key: {}, release: {}, read end: {}. {}"
.format(self._uniq_key, release, read_end, e))
raise e