public void processRequest()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/UnSubscribeProcessor.java [70:214]


    public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext) throws Exception {
        HttpCommand responseEventMeshCommand;
        final HttpCommand request = asyncContext.getRequest();
        final String localAddress = IPUtils.getLocalAddress();

        log.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(request.getRequestCode())),

            EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);

        final UnSubscribeRequestHeader unSubscribeRequestHeader = (UnSubscribeRequestHeader) request.getHeader();
        final UnSubscribeRequestBody unSubscribeRequestBody = (UnSubscribeRequestBody) request.getBody();
        EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
        final UnSubscribeResponseHeader unSubscribeResponseHeader = UnSubscribeResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
            eventMeshHttpConfiguration.getEventMeshCluster(), localAddress, eventMeshHttpConfiguration.getEventMeshEnv(),
            eventMeshHttpConfiguration.getEventMeshIDC());

        // validate header
        if (StringUtils.isAnyBlank(unSubscribeRequestHeader.getIdc(), unSubscribeRequestHeader.getPid(), unSubscribeRequestHeader.getSys())
            || !StringUtils.isNumeric(unSubscribeRequestHeader.getPid())) {
            completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null,
                UnSubscribeResponseBody.class);
            return;
        }

        // validate body
        if (StringUtils.isAnyBlank(unSubscribeRequestBody.getUrl(), unSubscribeRequestBody.getConsumerGroup()) || CollectionUtils.isEmpty(
            unSubscribeRequestBody.getTopics())) {
            completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null,
                UnSubscribeResponseBody.class);
            return;
        }

        final String pid = unSubscribeRequestHeader.getPid();
        final String consumerGroup = unSubscribeRequestBody.getConsumerGroup();
        final String unSubscribeUrl = unSubscribeRequestBody.getUrl();
        final List<String> unSubTopicList = unSubscribeRequestBody.getTopics();
        final HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
        final CompleteHandler<HttpCommand> handler = httpCommand -> {
            try {
                log.debug("{}", httpCommand);
                eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
                summaryMetrics.recordHTTPReqResTimeCost(System.currentTimeMillis() - request.getReqTime());
            } catch (Exception ex) {
                // ignore
            }
        };

        SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
        ConcurrentHashMap<String, ConsumerGroupConf> localConsumerGroupMap = subscriptionManager.getLocalConsumerGroupMapping();
        synchronized (subscriptionManager.getLocalClientInfoMapping()) {
            boolean isChange = true;

            registerClient(unSubscribeRequestHeader, consumerGroup, unSubTopicList, unSubscribeUrl);

            for (final String unSubTopic : unSubTopicList) {
                final List<Client> groupTopicClients = subscriptionManager.getLocalClientInfoMapping().get(consumerGroup + "@" + unSubTopic);

                final Iterator<Client> clientIterator = groupTopicClients.iterator();
                while (clientIterator.hasNext()) {
                    final Client client = clientIterator.next();

                    if (StringUtils.equals(client.getPid(), pid)
                        && StringUtils.equals(client.getUrl(), unSubscribeUrl)) {
                        log.warn("client {} start unsubscribe", JsonUtils.toJSONString(client));

                        clientIterator.remove();
                    }
                }
                if (CollectionUtils.isNotEmpty(groupTopicClients)) {
                    // change url
                    final Map<String, List<String>> idcUrls = new HashMap<>();
                    final Set<String> clientUrls = new HashSet<>();
                    for (final Client client : groupTopicClients) {
                        // remove subscribed url
                        if (!StringUtils.equals(unSubscribeUrl, client.getUrl())) {
                            clientUrls.add(client.getUrl());
                            idcUrls.computeIfAbsent(client.getIdc(), k -> new ArrayList<>());
                        }

                    }

                    synchronized (localConsumerGroupMap) {
                        final ConsumerGroupConf consumerGroupConf = localConsumerGroupMap.get(consumerGroup);

                        final Map<String, ConsumerGroupTopicConf> map = consumerGroupConf.getConsumerGroupTopicConf();
                        for (final Map.Entry<String, ConsumerGroupTopicConf> topicConf : map.entrySet()) {
                            // only modify the topic to subscribe
                            if (StringUtils.equals(unSubTopic, topicConf.getKey())) {
                                final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
                                latestTopicConf.setConsumerGroup(consumerGroup);
                                latestTopicConf.setTopic(unSubTopic);
                                latestTopicConf.setSubscriptionItem(topicConf.getValue().getSubscriptionItem());
                                latestTopicConf.setUrls(clientUrls);
                                latestTopicConf.setIdcUrls(idcUrls);
                                map.put(unSubTopic, latestTopicConf);
                            }
                        }
                        localConsumerGroupMap.put(consumerGroup, consumerGroupConf);
                    }
                } else {
                    isChange = false;
                    break;
                }
            }

            final long startTime = System.currentTimeMillis();
            if (isChange) {
                try {
                    eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, localConsumerGroupMap.get(consumerGroup));

                    responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
                    asyncContext.onComplete(responseEventMeshCommand, handler);

                } catch (Exception e) {
                    completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR,
                        EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2), UnSubscribeResponseBody.class);
                    final long endTime = System.currentTimeMillis();
                    log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}", endTime - startTime,
                        JsonUtils.toJSONString(unSubscribeRequestBody.getTopics()), unSubscribeRequestBody.getUrl(), e);
                    summaryMetrics.recordSendMsgFailed();
                    summaryMetrics.recordSendMsgCost(endTime - startTime);
                }
            } else {
                // remove
                try {
                    eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup, null);
                    responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
                    asyncContext.onComplete(responseEventMeshCommand, handler);
                    // clean ClientInfo
                    subscriptionManager.getLocalClientInfoMapping().keySet().removeIf(s -> StringUtils.contains(s, consumerGroup));
                    // clean ConsumerGroupInfo
                    localConsumerGroupMap.keySet().removeIf(s -> StringUtils.equals(consumerGroup, s));
                } catch (Exception e) {
                    completeResponse(request, asyncContext, unSubscribeResponseHeader, EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR,
                        EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2), UnSubscribeResponseBody.class);
                    final long endTime = System.currentTimeMillis();
                    log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|url={}", endTime - startTime,
                        JsonUtils.toJSONString(unSubscribeRequestBody.getTopics()), unSubscribeRequestBody.getUrl(), e);
                    summaryMetrics.recordSendMsgFailed();
                    summaryMetrics.recordSendMsgCost(endTime - startTime);
                }
            }
            eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
        }
    }