def __join_group()

in datahub/client/consumer/consumer_coordinator.py [0:0]


    def __join_group(self):
        timer = Timer(Constant.MAX_JOIN_GROUP_TIMEOUT)
        while not timer.is_expired():
            try:
                join_result = self._meta_data.datahub_client.join_group(
                    self._project_name,
                    self._topic_name,
                    self._sub_id,
                    self._session_timeout
                )
                self._consumer_id = join_result.consumer_id
                self._version_id = join_result.version_id
                self._session_timeout = join_result.session_timeout
                self._gen_uniq_key(self._consumer_id)
                self._logger.info("JoinGroup success. key: {}, consumer id: {}, version id: {}, session timeout: {}"
                                  .format(self._uniq_key, self._consumer_id, self._version_id, self._session_timeout))
                return
            except SubscriptionOfflineException as e:
                self._logger.warning("JoinGroup fail, subscription offline. key:{}. {}".format(self._uniq_key, e))
                raise e
            except DatahubException as e:
                self._logger.warning("JoinGroup fail. retry again. key:{}. {}".format(self._uniq_key, e))

            try:
                timer.wait_expire(1)
            except Exception as e:
                raise e

        raise TimeoutException("JoinGroup timeout. key: {}, elapsedMs: {}".format(self._uniq_key, timer.elapse()))