public void processRequest()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java [73:286]


    public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext)
        throws Exception {

        HttpCommand request = asyncContext.getRequest();
        final String localAddress = IPUtils.getLocalAddress();
        final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
            RequestCode.get(Integer.valueOf(request.getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, remoteAddr, localAddress);

        SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) request.getHeader();

        String protocolType = sendMessageRequestHeader.getProtocolType();

        final ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);

        final CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());

        EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
        final SendMessageResponseHeader sendMessageResponseHeader =
            SendMessageResponseHeader
                .buildHeader(Integer.valueOf(request.getRequestCode()),
                    eventMeshHttpConfiguration.getEventMeshCluster(),
                    localAddress,
                    eventMeshHttpConfiguration.getEventMeshEnv(),
                    eventMeshHttpConfiguration.getEventMeshIDC());

        // validate event
        if (!ObjectUtils.allNotNull(event, event.getSource(), event.getSpecVersion())
            || StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageResponseBody.class);
            return;
        }

        final String idc = getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey());
        final String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
        final String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());

        // validate event-extension
        if (StringUtils.isAnyBlank(idc, pid, sys) || !StringUtils.isNumeric(pid)) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageResponseBody.class);
            return;
        }

        final String bizNo = getExtension(event, SendMessageRequestBody.BIZSEQNO);
        final String uniqueId = getExtension(event, SendMessageRequestBody.UNIQUEID);
        final String producerGroup = getExtension(event, SendMessageRequestBody.PRODUCERGROUP);
        final String topic = event.getSubject();
        final String ttl = getExtension(event, SendMessageRequestBody.TTL);

        // validate body
        if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic, ttl) || event.getData() == null) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageResponseBody.class);
            return;
        }

        // do acl check
        if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
            final String user = getExtension(event, ProtocolKey.ClientInstanceKey.USERNAME.getKey());
            final String pass = getExtension(event, ProtocolKey.ClientInstanceKey.PASSWD.getKey());
            final int requestCode = Integer.parseInt(request.getRequestCode());

            try {
                this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, sys, topic, requestCode);
            } catch (Exception e) {
                completeResponse(request, asyncContext, sendMessageResponseHeader,
                    EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageResponseBody.class);
                log.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
                return;
            }
        }

        final HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
        // control flow rate limit
        if (!eventMeshHTTPServer.getMsgRateLimiter()
            .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, null, SendMessageResponseBody.class);
            summaryMetrics.recordHTTPDiscard();
            return;
        }

        final String content = new String(Objects.requireNonNull(event.getData()).toBytes(), Constants.DEFAULT_CHARSET);
        int eventMeshEventSize = eventMeshHttpConfiguration.getEventMeshEventSize();
        if (content.length() > eventMeshEventSize) {
            log.error("Event size exceeds the limit: {}", eventMeshEventSize);
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
                "Event size exceeds the limit: " + eventMeshEventSize,
                SendMessageResponseBody.class);
            return;
        }

        final EventMeshProducer eventMeshProducer =
            eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

        if (!eventMeshProducer.isStarted()) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, null, SendMessageResponseBody.class);
            return;
        }

        CloudEvent newEvent;
        try {
            newEvent = CloudEventBuilder.from(event)
                .withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
                .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .build();

            log.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);

        } catch (Exception e) {
            log.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, null, SendMessageResponseBody.class);
            return;
        }

        final SendMessageContext sendMessageContext =
            new SendMessageContext(bizNo, newEvent, eventMeshProducer, eventMeshHTTPServer);
        summaryMetrics.recordSendMsg();

        final long startTime = System.currentTimeMillis();

        final CompleteHandler<HttpCommand> handler = httpCommand -> {
            try {
                log.debug("{}", httpCommand);
                eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
                eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHTTPReqResTimeCost(
                    System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
            } catch (Exception ex) {
                log.error("onResponse error", ex);
                // ignore
            }
        };

        try {
            eventMeshProducer.request(sendMessageContext, new RequestReplyCallback() {

                @Override
                public void onSuccess(final CloudEvent event) {

                    log.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
                        + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);

                    try {
                        final CloudEvent newEvent = CloudEventBuilder.from(event)
                            .withExtension(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP,
                                String.valueOf(System.currentTimeMillis()))
                            .withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP,
                                String.valueOf(System.currentTimeMillis()))
                            .build();

                        final String rtnMsg = new String(Objects.requireNonNull(newEvent.getData()).toBytes(),
                            Constants.DEFAULT_CHARSET);

                        final HttpCommand succ = request.createHttpCommandResponse(
                            sendMessageResponseHeader,
                            SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
                                JsonUtils.toJSONString(SendMessageResponseBody.ReplyMessage.builder()
                                    .topic(topic)
                                    .body(rtnMsg)
                                    .properties(EventMeshUtil.getEventProp(newEvent))
                                    .build())));
                        asyncContext.onComplete(succ, handler);
                    } catch (Exception ex) {
                        final HttpCommand err = request.createHttpCommandResponse(
                            sendMessageResponseHeader,
                            SendMessageResponseBody.buildBody(
                                EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
                                EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
                                    + EventMeshUtil.stackTrace(ex, 2)));
                        asyncContext.onComplete(err, handler);

                        log.warn("message|mq2eventMesh|RSP", ex);
                    }
                }

                @Override
                public void onException(final Throwable e) {
                    final HttpCommand err = request.createHttpCommandResponse(
                        sendMessageResponseHeader,
                        SendMessageResponseBody
                            .buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
                                EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
                                    + EventMeshUtil.stackTrace(e, 2)));
                    asyncContext.onComplete(err, handler);

                    eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);

                    log.error("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
                        + "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, e);

                }
            }, Integer.parseInt(ttl));
        } catch (Exception ex) {
            completeResponse(request, asyncContext, sendMessageResponseHeader,
                EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR,
                EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2),
                SendMessageResponseBody.class);
            eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
            final long endTime = System.currentTimeMillis();
            summaryMetrics.recordSendMsgFailed();
            summaryMetrics.recordSendMsgCost(endTime - startTime);

            log.error("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                endTime - startTime, topic, bizNo, uniqueId, ex);
        }

        return;
    }