in rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/RocketMQUtil.java [287:328]
public static DefaultLitePullConsumer createDefaultLitePullConsumer(String nameServer, String accessChannel,
String groupName, String topicName, MessageModel messageModel, SelectorType selectorType,
String selectorExpression, String ak, String sk, int pullBatchSize, boolean useTLS)
throws MQClientException {
DefaultLitePullConsumer litePullConsumer = null;
if (StringUtils.hasLength(ak) && StringUtils.hasLength(sk)) {
litePullConsumer = new DefaultLitePullConsumer(groupName, new AclClientRPCHook(new SessionCredentials(ak, sk)));
litePullConsumer.setVipChannelEnabled(false);
} else {
litePullConsumer = new DefaultLitePullConsumer(groupName);
}
litePullConsumer.setNamesrvAddr(nameServer);
litePullConsumer.setPullBatchSize(pullBatchSize);
if (accessChannel != null) {
litePullConsumer.setAccessChannel(AccessChannel.valueOf(accessChannel));
}
litePullConsumer.setUseTLS(useTLS);
switch (messageModel) {
case BROADCASTING:
litePullConsumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING);
break;
case CLUSTERING:
litePullConsumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.CLUSTERING);
break;
default:
throw new IllegalArgumentException("Property 'messageModel' was wrong.");
}
switch (selectorType) {
case SQL92:
litePullConsumer.subscribe(topicName, MessageSelector.bySql(selectorExpression));
break;
case TAG:
litePullConsumer.subscribe(topicName, selectorExpression);
break;
default:
throw new IllegalArgumentException("Property 'selectorType' was wrong.");
}
return litePullConsumer;
}