public void processRequest()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java [66:183]


    public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext) throws Exception {
        HttpCommand responseEventMeshCommand;
        final String localAddress = IPUtils.getLocalAddress();
        HttpCommand request = asyncContext.getRequest();
        log.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(request.getRequestCode())),
            EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
        final HeartbeatRequestHeader heartbeatRequestHeader = (HeartbeatRequestHeader) request.getHeader();
        final HeartbeatRequestBody heartbeatRequestBody = (HeartbeatRequestBody) request.getBody();
        EventMeshHTTPConfiguration httpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
        final HeartbeatResponseHeader heartbeatResponseHeader =
            HeartbeatResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
                httpConfiguration.getEventMeshCluster(),
                localAddress, httpConfiguration.getEventMeshEnv(),
                httpConfiguration.getEventMeshIDC());

        // validate header

        if (StringUtils.isAnyBlank(
            heartbeatRequestHeader.getIdc(), heartbeatRequestHeader.getPid(), heartbeatRequestHeader.getSys())
            || !StringUtils.isNumeric(heartbeatRequestHeader.getPid())) {
            completeResponse(request, asyncContext, heartbeatResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, HeartbeatResponseBody.class);
            return;
        }

        // validate body
        if (StringUtils.isAnyBlank(heartbeatRequestBody.getClientType(), heartbeatRequestBody.getConsumerGroup())
            || CollectionUtils.isEmpty(heartbeatRequestBody.getHeartbeatEntities())) {
            completeResponse(request, asyncContext, heartbeatResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, HeartbeatResponseBody.class);
            return;
        }
        final ConcurrentHashMap<String, List<Client>> tmpMap = new ConcurrentHashMap<>();
        final List<HeartbeatRequestBody.HeartbeatEntity> heartbeatEntities = heartbeatRequestBody.getHeartbeatEntities();

        for (final HeartbeatRequestBody.HeartbeatEntity heartbeatEntity : heartbeatEntities) {
            final Client client = new Client();
            client.setEnv(heartbeatRequestHeader.getEnv());
            client.setIdc(heartbeatRequestHeader.getIdc());
            client.setSys(heartbeatRequestHeader.getSys());
            client.setIp(heartbeatRequestHeader.getIp());
            client.setPid(heartbeatRequestHeader.getPid());
            client.setConsumerGroup(heartbeatRequestBody.getConsumerGroup());
            client.setTopic(heartbeatEntity.getTopic());
            client.setUrl(heartbeatEntity.getUrl());
            client.setLastUpTime(new Date());

            if (StringUtils.isAnyBlank(client.getTopic(), client.getUrl())) {
                continue;
            }

            // do acl check
            if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
                try {
                    this.acl.doAclCheckInHttpHeartbeat(
                        RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                        heartbeatRequestHeader.getUsername(),
                        heartbeatRequestHeader.getPasswd(),
                        heartbeatRequestHeader.getSys(),
                        client.getTopic(),
                        Integer.parseInt(heartbeatRequestHeader.getCode()));
                } catch (Exception e) {
                    completeResponse(request, asyncContext, heartbeatResponseHeader,
                        EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), HeartbeatResponseBody.class);
                    log.warn("CLIENT HAS NO PERMISSION,HeartBeatProcessor subscribe failed", e);
                    return;
                }
            }

            final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
            List<Client> clients = tmpMap.computeIfAbsent(groupTopicKey, k -> new ArrayList<>());

            clients.add(client);

        }

        ConcurrentHashMap<String, List<Client>> clientInfoMap =
            eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping();
        synchronized (clientInfoMap) {
            for (final Map.Entry<String, List<Client>> groupTopicClientMapping : tmpMap.entrySet()) {
                final List<Client> localClientList = clientInfoMap.get(groupTopicClientMapping.getKey());
                if (CollectionUtils.isEmpty(localClientList)) {
                    clientInfoMap.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue());
                } else {
                    final List<Client> tmpClientList = groupTopicClientMapping.getValue();
                    supplyClientInfoList(tmpClientList, localClientList);
                    clientInfoMap.put(groupTopicClientMapping.getKey(), localClientList);
                }

            }
        }

        final long startTime = System.currentTimeMillis();
        try {
            final CompleteHandler<HttpCommand> handler = httpCommand -> {
                try {
                    log.debug("{}", httpCommand);
                    eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
                    eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHTTPReqResTimeCost(
                        System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
                } catch (Exception ex) {
                    // ignore
                }
            };
            responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
            asyncContext.onComplete(responseEventMeshCommand, handler);
        } catch (Exception e) {
            completeResponse(request, asyncContext, heartbeatResponseHeader,
                EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR,
                EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2),
                HeartbeatResponseBody.class);
            final long elapsedTime = System.currentTimeMillis() - startTime;
            log.error("message|eventMesh2mq|REQ|ASYNC|heartBeatMessageCost={}ms", elapsedTime, e);
            eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsgFailed();
            eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsgCost(elapsedTime);
        }

    }