in datahub/client/common/meta_data.py [0:0]
def __init__(self, key, project_name, topic_name, sub_id, common_config):
self._class_key = key
self._logger = logging.getLogger(MetaData.__name__)
self._endpoint = common_config.endpoint
self._datahub_client = DatahubFactory.create_datahub_client(common_config)
# coordinators set
self._coordinators = set()
# Update topic
self._topic_meta = self.__init_topic_meta(project_name, topic_name)
# Update shard
self._updating = atomic.AtomicLong(0)
self._shard_meta_map = dict()
self._timer = Timer(Constant.SHARD_META_REFRESH_TIMEOUT)
self.__update_shard_meta_once()
# pub / sub
thread_num = max(min(common_config.async_thread_limit, Constant.MAX_ASYNC_THREAD_LIMIT), Constant.MIN_ASYNC_THREAD_LIMIT)
queue_limit = common_config.thread_queue_limit
if sub_id:
self._message_reader, self._message_writer = MessageReader(self, queue_limit, thread_num), None
else:
self._message_reader, self._message_writer = None, MessageWriter(self, queue_limit, thread_num)