in datahub/client/consumer/consumer_coordinator.py [0:0]
def __join_group(self):
timer = Timer(Constant.MAX_JOIN_GROUP_TIMEOUT)
while not timer.is_expired():
try:
join_result = self._meta_data.datahub_client.join_group(
self._project_name,
self._topic_name,
self._sub_id,
self._session_timeout
)
self._consumer_id = join_result.consumer_id
self._version_id = join_result.version_id
self._session_timeout = join_result.session_timeout
self._gen_uniq_key(self._consumer_id)
self._logger.info("JoinGroup success. key: {}, consumer id: {}, version id: {}, session timeout: {}"
.format(self._uniq_key, self._consumer_id, self._version_id, self._session_timeout))
return
except SubscriptionOfflineException as e:
self._logger.warning("JoinGroup fail, subscription offline. key:{}. {}".format(self._uniq_key, e))
raise e
except DatahubException as e:
self._logger.warning("JoinGroup fail. retry again. key:{}. {}".format(self._uniq_key, e))
try:
timer.wait_expire(1)
except Exception as e:
raise e
raise TimeoutException("JoinGroup timeout. key: {}, elapsedMs: {}".format(self._uniq_key, timer.elapse()))