in mqtt-cs/src/main/java/org/apache/rocketmq/mqtt/cs/protocol/mqtt/handler/MqttSubscribeHandler.java [82:129]
public void doHandler(ChannelHandlerContext ctx, MqttSubscribeMessage mqttMessage, HookResult upstreamHookResult) {
String clientId = ChannelInfo.getClientId(ctx.channel());
Channel channel = ctx.channel();
if (!upstreamHookResult.isSuccess()) {
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, upstreamHookResult.getRemark());
return;
}
CompletableFuture<Void> future = new CompletableFuture<>();
ChannelInfo.setFuture(channel, ChannelInfo.FUTURE_SUBSCRIBE, future);
scheduler.schedule(() -> {
if (!future.isDone()) {
future.complete(null);
}
}, 1, TimeUnit.SECONDS);
try {
MqttSubscribePayload payload = mqttMessage.payload();
List<MqttTopicSubscription> mqttTopicSubscriptions = payload.topicSubscriptions();
Set<Subscription> subscriptions = new HashSet<>();
if (mqttTopicSubscriptions != null && !mqttTopicSubscriptions.isEmpty()) {
for (MqttTopicSubscription mqttTopicSubscription : mqttTopicSubscriptions) {
Subscription subscription = new Subscription();
subscription.setQos(mqttTopicSubscription.qualityOfService().value());
subscription.setTopicFilter(TopicUtils.normalizeTopic(mqttTopicSubscription.topicName()));
subscriptions.add(subscription);
}
sessionLoop.addSubscription(ChannelInfo.getId(ctx.channel()), subscriptions);
}
future.thenAccept(aVoid -> {
if (!channel.isActive()) {
return;
}
ChannelInfo.removeFuture(channel, ChannelInfo.FUTURE_SUBSCRIBE);
channel.writeAndFlush(getResponse(mqttMessage));
if (!subscriptions.isEmpty()) { //Write retained message
try {
sendRetainMessage(ctx, subscriptions);
} catch (InterruptedException | RemotingException |
org.apache.rocketmq.remoting.exception.RemotingException e) {
throw new RuntimeException(e);
}
}
});
} catch (Exception e) {
logger.error("Subscribe:{}", clientId, e);
channelManager.closeConnect(channel, ChannelCloseFrom.SERVER, "SubscribeException");
}
}