protected void subscribe()

in ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java [153:178]


    protected void subscribe(final String topic, final MessageSelector selector) {
        String subExpression = "*";
        String type = org.apache.rocketmq.common.filter.ExpressionType.TAG;
        if (selector != null) {
            if (selector.getType() == null) {
                throw new ONSClientException("Expression type is null!");
            }
            subExpression = selector.getSubExpression();
            type = selector.getType().name();
        }

        org.apache.rocketmq.client.consumer.MessageSelector messageSelector;
        if (org.apache.rocketmq.common.filter.ExpressionType.SQL92.equals(type)) {
            messageSelector = org.apache.rocketmq.client.consumer.MessageSelector.bySql(subExpression);
        } else if (org.apache.rocketmq.common.filter.ExpressionType.TAG.equals(type)) {
            messageSelector = org.apache.rocketmq.client.consumer.MessageSelector.byTag(subExpression);
        } else {
            throw new ONSClientException(String.format("Expression type %s is unknown!", type));
        }

        try {
            this.defaultMQPushConsumer.subscribe(topic, messageSelector);
        } catch (MQClientException e) {
            throw new ONSClientException("Consumer subscribe exception", e);
        }
    }