public void processRequest()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncMessageProcessor.java [85:307]


    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {

        HttpCommand responseEventMeshCommand;
        String localAddress = IPUtils.getLocalAddress();
        HttpCommand request = asyncContext.getRequest();
        String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        CMD_LOGGER.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(
                Integer.valueOf(request.getRequestCode())),
            EventMeshConstants.PROTOCOL_HTTP,
            remoteAddr, localAddress);

        SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) request.getHeader();

        EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
        SendMessageResponseHeader sendMessageResponseHeader =
            SendMessageResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
                eventMeshHttpConfiguration.getEventMeshCluster(),
                localAddress, eventMeshHttpConfiguration.getEventMeshEnv(),
                eventMeshHttpConfiguration.getEventMeshIDC());

        String protocolType = sendMessageRequestHeader.getProtocolType();
        String protocolVersion = sendMessageRequestHeader.getProtocolVersion();
        ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
            ProtocolPluginFactory.getProtocolAdaptor(protocolType);
        CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(request);

        Span span = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
            EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_SERVER_SPAN, true);

        // validate event
        if (!ObjectUtils.allNotNull(event, event.getSource(), event.getSpecVersion())
            || StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageResponseBody.class);
            spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR);
            return;
        }

        String idc = getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey());
        String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
        String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());

        // validate event-extension
        if (StringUtils.isAnyBlank(idc, pid, sys)
            || !StringUtils.isNumeric(pid)) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageResponseBody.class);
            spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR);
            return;
        }

        String bizNo = getExtension(event, SendMessageRequestBody.BIZSEQNO);
        String uniqueId = getExtension(event, SendMessageRequestBody.UNIQUEID);
        String producerGroup = getExtension(event, SendMessageRequestBody.PRODUCERGROUP);
        String topic = event.getSubject();

        // validate body
        if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
            || event.getData() == null) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageResponseBody.class);
            spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR);
            return;
        }

        // do acl check
        if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
            String user = getExtension(event, ProtocolKey.ClientInstanceKey.USERNAME.getKey());
            String pass = getExtension(event, ProtocolKey.ClientInstanceKey.PASSWD.getKey());
            String subsystem = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());
            int requestCode = Integer.parseInt(request.getRequestCode());
            try {
                this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
            } catch (Exception e) {
                completeResponse(request, asyncContext, sendMessageResponseHeader,
                    EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageResponseBody.class);
                ACL_LOGGER.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);

                spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_ACL_ERR);
                return;
            }
        }

        final HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
        // control flow rate limit
        if (!eventMeshHTTPServer.getMsgRateLimiter()
            .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, null, SendMessageResponseBody.class);
            summaryMetrics.recordHTTPDiscard();

            spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR);
            return;
        }

        EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

        if (!eventMeshProducer.isStarted()) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, null, SendMessageResponseBody.class);
            spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR);

            return;
        }

        String ttl = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
        String ttlExt = getExtension(event, SendMessageRequestBody.TTL);
        if (StringUtils.isBlank(ttlExt) && !StringUtils.isNumeric(ttlExt)) {
            event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, ttl).build();
        }

        String content = event.getData() == null ? "" : new String(Objects.requireNonNull(event.getData()).toBytes(), Constants.DEFAULT_CHARSET);
        if (content.length() > eventMeshHttpConfiguration.getEventMeshEventSize()) {
            HTTP_LOGGER.error("Event size exceeds the limit: {}",
                eventMeshHttpConfiguration.getEventMeshEventSize());
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR,
                "Event size exceeds the limit: " + eventMeshHttpConfiguration.getEventMeshEventSize(),
                SendMessageResponseBody.class);
            spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR);
            return;
        }

        try {
            event = CloudEventBuilder.from(event)
                .withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
                .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, request.reqTime)
                .withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP, eventMeshHttpConfiguration.getEventMeshServerIp())
                .build();

            MESSAGE_LOGGER.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
        } catch (Exception e) {
            MESSAGE_LOGGER.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR,
                EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2),
                SendMessageResponseBody.class);
            spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR);
            return;
        }

        final SendMessageContext sendMessageContext = new SendMessageContext(bizNo, event, eventMeshProducer,
            eventMeshHTTPServer);
        summaryMetrics.recordSendMsg();

        long startTime = System.currentTimeMillis();

        final CompleteHandler<HttpCommand> handler = httpCommand -> {
            try {
                HTTP_LOGGER.debug("{}", httpCommand);
                eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());

                summaryMetrics.recordHTTPReqResTimeCost(
                    System.currentTimeMillis() - request.getReqTime());
            } catch (Exception ex) {
                // ignore
            }
        };

        try {
            event = CloudEventBuilder.from(sendMessageContext.getEvent())
                .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .build();
            sendMessageContext.setEvent(event);

            Span clientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(protocolVersion, event),
                EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false);
            try {
                eventMeshProducer.send(sendMessageContext, new SendCallback() {

                    @Override
                    public void onSuccess(SendResult sendResult) {
                        HttpCommand succ = request.createHttpCommandResponse(
                            sendMessageResponseHeader,
                            SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
                                EventMeshRetCode.SUCCESS.getErrMsg() + sendResult));
                        asyncContext.onComplete(succ, handler);
                        long endTime = System.currentTimeMillis();
                        summaryMetrics.recordSendMsgCost(endTime - startTime);
                        MESSAGE_LOGGER.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                            endTime - startTime, topic, bizNo, uniqueId);

                        TraceUtils.finishSpan(span, sendMessageContext.getEvent());
                    }

                    @Override
                    public void onException(OnExceptionContext context) {
                        HttpCommand err = request.createHttpCommandResponse(
                            sendMessageResponseHeader,
                            SendMessageResponseBody.buildBody(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode(),
                                EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
                                    + EventMeshUtil.stackTrace(context.getException(), 2)));
                        asyncContext.onComplete(err, handler);

                        eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
                        long endTime = System.currentTimeMillis();
                        summaryMetrics.recordSendMsgFailed();
                        summaryMetrics.recordSendMsgCost(endTime - startTime);
                        MESSAGE_LOGGER.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                            endTime - startTime, topic, bizNo, uniqueId, context.getException());

                        TraceUtils.finishSpanWithException(span,
                            EventMeshUtil.getCloudEventExtensionMap(protocolVersion, sendMessageContext.getEvent()),
                            EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg(), context.getException());
                    }
                });
            } finally {
                TraceUtils.finishSpan(clientSpan, event);
            }

        } catch (Exception ex) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, null, SendMessageResponseBody.class);
            spanWithException(event, protocolVersion, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR);

            eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
            long endTime = System.currentTimeMillis();
            MESSAGE_LOGGER.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                endTime - startTime, topic, bizNo, uniqueId, ex);
            summaryMetrics.recordSendMsgFailed();
            summaryMetrics.recordSendMsgCost(endTime - startTime);
        }
    }