in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java [70:214]
public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext) throws Exception {
HttpCommand responseEventMeshCommand;
final HttpCommand request = asyncContext.getRequest();
final String localAddress = IPUtils.getLocalAddress();
log.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(request.getRequestCode())),
EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
final UnSubscribeRequestHeader unSubscribeRequestHeader = (UnSubscribeRequestHeader) request.getHeader();
final UnSubscribeRequestBody unSubscribeRequestBody = (UnSubscribeRequestBody) request.getBody();
EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
final UnSubscribeResponseHeader unSubscribeResponseHeader = UnSubscribeResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
eventMeshHttpConfiguration.getEventMeshCluster(), localAddress, eventMeshHttpConfiguration.getEventMeshEnv(),
eventMeshHttpConfiguration.getEventMeshIDC());
// validate header
if (StringUtils.isAnyBlank(unSubscribeRequestHeader.getIdc(), unSubscribeRequestHeader.getPid(), unSubscribeRequestHeader.getSys())
|| !StringUtils.isNumeric(unSubscribeRequestHeader.getPid())) {
completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null,
UnSubscribeResponseBody.class);
return;
}
// validate body
if (StringUtils.isAnyBlank(unSubscribeRequestBody.getUrl(), unSubscribeRequestBody.getConsumerGroup()) || CollectionUtils.isEmpty(
unSubscribeRequestBody.getTopics())) {
completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null,
UnSubscribeResponseBody.class);
return;
}
final String pid = unSubscribeRequestHeader.getPid();
final String consumerGroup = unSubscribeRequestBody.getConsumerGroup();
final String unSubscribeUrl = unSubscribeRequestBody.getUrl();
final List<String> unSubTopicList = unSubscribeRequestBody.getTopics();
final HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
final CompleteHandler<HttpCommand> handler = httpCommand -> {
try {
log.debug("{}", httpCommand);
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
} catch (Exception ex) {
// ignore
}
};
SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMap = subscriptionManager.getLocalConsumerGroupMapping();
synchronized (subscriptionManager.getLocalClientInfoMapping()) {
boolean isChange = true;
registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl);
for (final String unSubTopic : unSubTopicList) {
final List<Client> groupTopicClients = subscriptionManager.getLocalClientInfoMapping().get(consumerGroup + "@" + unSubTopic);
final Iterator<Client> clientIterator = groupTopicClients.iterator();
while (clientIterator.hasNext()) {
final Client client = clientIterator.next();
if (StringUtils.equals(client.getPid(), pid)
&& StringUtils.equals(client.getUrl(), unSubscribeUrl)) {
log.warn("client {} start unsubscribe", JsonUtils.toJSONString(client));
clientIterator.remove();
}
}
if (CollectionUtils.isNotEmpty(groupTopicClients)) {
// change url
final Map<String, List<String>> idcUrls = new HashMap<>();
final Set<String> clientUrls = new HashSet<>();
for (final Client client : groupTopicClients) {
// remove subscribed url
if (!StringUtils.equals(unSubscribeUrl, client.getUrl())) {
clientUrls.add(client.getUrl());
idcUrls.computeIfAbsent(client.getIdc(), k -> new ArrayList<>());
}
}
synchronized (localConsumerGroupMap) {
final ConsumerGroupConf consumerGroupConf = localConsumerGroupMap.get(consumerGroup);
final Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
for (final Map.Entry<String, ConsumerGroupTopicConf> topicConf : map.entrySet()) {
// only modify the topic to subscribe
if (StringUtils.equals(unSubTopic, topicConf.getKey())) {
final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(unSubTopic);
latestTopicConf.setSubscriptionItem(topicConf.getValue().getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);
latestTopicConf.setIdcUrls(idcUrls);
map.put(unSubTopic, latestTopicConf);
}
}
localConsumerGroupMap.put(consumerGroup, consumerGroupConf);
}
} else {
isChange = false;
break;
}
}
final long startTime = System.currentTimeMillis();
if (isChange) {
try {
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, localConsumerGroupMap.get(consumerGroup));
responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR,
EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2), UnSubscribeResponseBody.class);
final long endTime = System.currentTimeMillis();
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}", endTime - startTime,
JsonUtils.toJSONString(unSubscribeRequestBody.getTopics()), unSubscribeRequestBody.getUrl(), e);
summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);
}
} else {
// remove
try {
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, null);
responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
// clean ClientInfo
subscriptionManager.getLocalClientInfoMapping().keySet().removeIf(s -> StringUtils.contains(s, consumerGroup));
// clean ConsumerGroupInfo
localConsumerGroupMap.keySet().removeIf(s -> StringUtils.equals(consumerGroup, s));
} catch (Exception e) {
completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR,
EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2), UnSubscribeResponseBody.class);
final long endTime = System.currentTimeMillis();
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}", endTime - startTime,
JsonUtils.toJSONString(unSubscribeRequestBody.getTopics()), unSubscribeRequestBody.getUrl(), e);
summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);
}
}
eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
}
}