public void handler()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java [70:229]


    public void handler(final HandlerService.HandlerSpecific handlerSpecific, final HttpRequest httpRequest) throws Exception {

        final AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();

        final ChannelHandlerContext ctx = handlerSpecific.getCtx();

        final HttpEventWrapper requestWrapper = asyncContext.getRequest();

        String localAddress = IPUtils.getLocalAddress();
        String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());

        log.info("uri={}|{}|client2eventMesh|from={}|to={}",
            requestWrapper.getRequestURI(), EventMeshConstants.PROTOCOL_HTTP, remoteAddr, localAddress);

        // user request header
        requestWrapper.getHeaderMap().put(ProtocolKey.ClientInstanceKey.IP.getKey(), remoteAddr);

        // build sys header
        requestWrapper.buildSysHeaderForClient();

        final Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
        final Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
        final Map<String, Object> responseBodyMap = new HashMap<>();

        // validate header
        if (validateSysHeader(sysHeaderMap)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
                responseBodyMap, null);
            return;
        }

        // validate body
        final byte[] requestBody = requestWrapper.getBody();

        final Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.parseTypeReferenceObject(
            new String(requestBody, Constants.DEFAULT_CHARSET),
            new TypeReference<HashMap<String, Object>>() {
            })).orElseGet(Maps::newHashMap);

        if (validatedRequestBodyMap(requestBodyMap)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                responseBodyMap, null);
            return;
        }

        final String unSubscribeUrl = requestBodyMap.get(EventMeshConstants.URL).toString();
        final String consumerGroup = requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP).toString();

        // unSubscriptionItem
        final List<String> unSubTopicList = Optional.ofNullable(JsonUtils.parseTypeReferenceObject(
            JsonUtils.toJSONString(requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC)),
            new TypeReference<List<String>>() {
            })).orElseGet(Collections::emptyList);

        final String pid = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID.getKey()).toString();

        synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
            boolean isChange = true;

            registerClient(requestWrapper, consumerGroup, unSubTopicList, unSubscribeUrl);

            for (final String unSubTopic : unSubTopicList) {
                final List<Client> groupTopicClients = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
                    .get(consumerGroup + "@" + unSubTopic);
                final Iterator<Client> clientIterator = groupTopicClients.iterator();
                while (clientIterator.hasNext()) {
                    final Client client = clientIterator.next();
                    if (StringUtils.equals(client.getPid(), pid)
                        && StringUtils.equals(client.getUrl(), unSubscribeUrl)) {
                        log.warn("client {} start unsubscribe", JsonUtils.toJSONString(client));
                        clientIterator.remove();
                    }
                }

                if (CollectionUtils.isNotEmpty(groupTopicClients)) {
                    // change url
                    final Map<String, List<String>> idcUrls = new HashMap<>();
                    final Set<String> clientUrls = new HashSet<>();
                    for (final Client client : groupTopicClients) {
                        // remove subscribed url
                        if (!StringUtils.equals(unSubscribeUrl, client.getUrl())) {
                            clientUrls.add(client.getUrl());

                            List<String> urls = idcUrls.computeIfAbsent(client.getIdc(), list -> new ArrayList<>());
                            urls.add(StringUtils.deleteWhitespace(client.getUrl()));
                        }

                    }

                    synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()) {
                        final ConsumerGroupConf consumerGroupConf =
                            eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup);
                        final Map<String, ConsumerGroupTopicConf> map =
                            consumerGroupConf.getConsumerGroupTopicConf();
                        for (final Map.Entry<String, ConsumerGroupTopicConf> entry : map.entrySet()) {
                            // only modify the topic to subscribe
                            if (StringUtils.equals(unSubTopic, entry.getKey())) {
                                final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
                                latestTopicConf.setConsumerGroup(consumerGroup);
                                latestTopicConf.setTopic(unSubTopic);
                                latestTopicConf.setSubscriptionItem(entry.getValue().getSubscriptionItem());
                                latestTopicConf.setUrls(clientUrls);
                                latestTopicConf.setIdcUrls(idcUrls);
                                map.put(unSubTopic, latestTopicConf);
                            }
                        }
                        eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().put(consumerGroup, consumerGroupConf);
                    }
                } else {
                    isChange = false;
                    break;
                }
            }
            final long startTime = System.currentTimeMillis();
            if (isChange) {
                try {
                    eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
                        eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));

                    responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
                    responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());

                    handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);

                } catch (Exception e) {
                    log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
                        + "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e);

                    handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap,
                        responseBodyMap, null);
                }
            } else {
                // remove
                try {
                    eventMeshHTTPServer.getConsumerManager()
                        .notifyConsumerManager(consumerGroup, null);
                    responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
                    responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());

                    handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
                    // clean ClientInfo
                    eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().keySet()
                        .removeIf(s -> StringUtils.contains(s, consumerGroup));
                    // clean ConsumerGroupInfo
                    eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet()
                        .removeIf(s -> StringUtils.equals(consumerGroup, s));
                } catch (Exception e) {

                    log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
                        + "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e);

                    handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap,
                        responseBodyMap, null);
                }
            }

            // Update service metadata
            eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
        }
    }