in ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java [153:178]
protected void subscribe(final String topic, final MessageSelector selector) {
String subExpression = "*";
String type = org.apache.rocketmq.common.filter.ExpressionType.TAG;
if (selector != null) {
if (selector.getType() == null) {
throw new ONSClientException("Expression type is null!");
}
subExpression = selector.getSubExpression();
type = selector.getType().name();
}
org.apache.rocketmq.client.consumer.MessageSelector messageSelector;
if (org.apache.rocketmq.common.filter.ExpressionType.SQL92.equals(type)) {
messageSelector = org.apache.rocketmq.client.consumer.MessageSelector.bySql(subExpression);
} else if (org.apache.rocketmq.common.filter.ExpressionType.TAG.equals(type)) {
messageSelector = org.apache.rocketmq.client.consumer.MessageSelector.byTag(subExpression);
} else {
throw new ONSClientException(String.format("Expression type %s is unknown!", type));
}
try {
this.defaultMQPushConsumer.subscribe(topic, messageSelector);
} catch (MQClientException e) {
throw new ONSClientException("Consumer subscribe exception", e);
}
}