public void processRequest()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java [64:185]


    public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext)
        throws Exception {
        HttpCommand responseEventMeshCommand;
        final HttpCommand request = asyncContext.getRequest();
        final Integer requestCode = Integer.valueOf(request.getRequestCode());
        final String localAddress = IPUtils.getLocalAddress();
        final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
            RequestCode.get(requestCode), EventMeshConstants.PROTOCOL_HTTP, remoteAddr, localAddress);

        final SubscribeRequestHeader subscribeRequestHeader = (SubscribeRequestHeader) request.getHeader();
        final SubscribeRequestBody subscribeRequestBody = (SubscribeRequestBody) request.getBody();
        EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
        final SubscribeResponseHeader subscribeResponseHeader =
            SubscribeResponseHeader
                .buildHeader(requestCode,
                    eventMeshHttpConfiguration.getEventMeshCluster(),
                    localAddress,
                    eventMeshHttpConfiguration.getEventMeshEnv(),
                    eventMeshHttpConfiguration.getEventMeshIDC());

        // validate header
        if (StringUtils.isAnyBlank(subscribeRequestHeader.getIdc(),
            subscribeRequestHeader.getPid(), subscribeRequestHeader.getSys())
            || !StringUtils.isNumeric(subscribeRequestHeader.getPid())) {
            completeResponse(request, asyncContext, subscribeResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SubscribeResponseBody.class);
            return;
        }

        // validate body
        if (StringUtils.isAnyBlank(subscribeRequestBody.getUrl(), subscribeRequestBody.getConsumerGroup())
            || CollectionUtils.isEmpty(subscribeRequestBody.getTopics())) {
            completeResponse(request, asyncContext, subscribeResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SubscribeResponseBody.class);
            return;
        }
        final List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();

        // do acl check
        if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
            for (final SubscriptionItem item : subTopicList) {
                try {
                    this.acl.doAclCheckInHttpReceive(remoteAddr,
                        subscribeRequestHeader.getUsername(),
                        subscribeRequestHeader.getPasswd(),
                        subscribeRequestHeader.getSys(), item.getTopic(),
                        requestCode);
                } catch (Exception e) {
                    completeResponse(request, asyncContext, subscribeResponseHeader,
                        EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SubscribeResponseBody.class);
                    log.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
                    return;
                }
            }
        }

        final String url = subscribeRequestBody.getUrl();
        final String consumerGroup = subscribeRequestBody.getConsumerGroup();

        // validate URL
        try {
            if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(),
                eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) {
                log.error("subscriber url {} is not valid", url);
                completeResponse(request, asyncContext, subscribeResponseHeader,
                    EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
                    EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url,
                    SubscribeResponseBody.class);
                return;
            }
        } catch (Exception e) {
            log.error("subscriber url:{} is invalid.", url, e);
            completeResponse(request, asyncContext, subscribeResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url,
                SubscribeResponseBody.class);
            return;
        }

        SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
        synchronized (subscriptionManager.getLocalClientInfoMapping()) {
            ClientInfo clientInfo = getClientInfo(subscribeRequestHeader);
            subscriptionManager.registerClient(clientInfo, consumerGroup, subTopicList, url);
            subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subTopicList);

            final long startTime = System.currentTimeMillis();
            try {
                // subscription relationship change notification
                eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
                    subscriptionManager.getLocalConsumerGroupMapping().get(consumerGroup));

                final CompleteHandler<HttpCommand> handler = httpCommand -> {
                    try {
                        log.debug("{}", httpCommand);
                        eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());

                        eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHTTPReqResTimeCost(
                            System.currentTimeMillis() - request.getReqTime());
                    } catch (Exception ex) {
                        log.error("onResponse error", ex);
                    }
                };

                responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
                asyncContext.onComplete(responseEventMeshCommand, handler);
            } catch (Exception e) {
                completeResponse(request, asyncContext, subscribeResponseHeader,
                    EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR,
                    EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2),
                    SubscribeResponseBody.class);
                final long endTime = System.currentTimeMillis();

                log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                    endTime - startTime, JsonUtils.toJSONString(subscribeRequestBody.getTopics()), subscribeRequestBody.getUrl(), e);

                eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsgFailed();
                eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsgCost(endTime - startTime);
            }
            eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
        }
    }