in datahub/client/common/meta_data.py [0:0]
def __update_shard_meta_once(self):
if self._updating.compare_and_set(0, 1):
new_shard_map = dict()
try:
list_shard_result = self._datahub_client.list_shard(self._topic_meta.project_name, self._topic_meta.topic_name)
for shard in list_shard_result.shards:
new_shard_map[shard.shard_id] = ShardMeta(shard.shard_id, self._endpoint, shard.state, list_shard_result.protocol)
except DatahubException as e:
self._logger.warning("Update shard meta fail. key: {}, DatahubException: {}".format(self._class_key, e))
raise e
except Exception as e:
self._logger.warning("Update shard meta fail. key: {}, {}".format(self._class_key, e))
raise e
new_add = [k for k in new_shard_map if k not in self._shard_meta_map]
new_del = [k for k in self._shard_meta_map if k not in new_shard_map]
if len(new_add) > 0 or len(new_del) > 0:
self._logger.debug("Shard changed when update shard meta. key: {}, new_add: {}, new_del: {}".format(self._class_key, new_add, new_del))
self._shard_meta_map = new_shard_map
for coordinator in self._coordinators:
coordinator.on_shard_meta_change(new_add, new_del)
self._timer.reset()
self._logger.debug("Update shard meta success. key: {}".format(self._class_key))
self._updating.compare_and_set(1, 0)