public void processRequest()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java [79:256]


    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
        throws Exception {
        final HttpCommand request = asyncContext.getRequest();
        final Integer requestCode = Integer.valueOf(request.getRequestCode());

        CMD_LOGGER.info("cmd={}|{}|client2eventMesh|from={}|to={}",
            RequestCode.get(requestCode),
            EventMeshConstants.PROTOCOL_HTTP,
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

        SendMessageBatchV2RequestHeader sendMessageBatchV2RequestHeader =
            (SendMessageBatchV2RequestHeader) request.getHeader();

        String protocolType = sendMessageBatchV2RequestHeader.getProtocolType();
        ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
            ProtocolPluginFactory.getProtocolAdaptor(protocolType);
        CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(request);

        EventMeshHTTPConfiguration httpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
        SendMessageBatchV2ResponseHeader sendMessageBatchV2ResponseHeader =
            SendMessageBatchV2ResponseHeader.buildHeader(
                requestCode,
                httpConfiguration.getEventMeshCluster(),
                httpConfiguration.getEventMeshEnv(),
                httpConfiguration.getEventMeshIDC());

        // todo: use validate processor to check
        // validate event
        if (!ObjectUtils.allNotNull(event.getSource(), event.getSpecVersion())
            || StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
            completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }

        String idc = getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey());
        String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
        String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());

        // validate event-extension
        if (StringUtils.isAnyBlank(idc, pid, sys)
            || !StringUtils.isNumeric(pid)) {
            completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }

        String bizNo = getExtension(event, SendMessageBatchV2RequestBody.BIZSEQNO);
        String producerGroup = getExtension(event, SendMessageBatchV2RequestBody.PRODUCERGROUP);
        String topic = event.getSubject();

        if (StringUtils.isAnyBlank(bizNo, topic, producerGroup)
            || event.getData() == null) {
            completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }

        String content = new String(Objects.requireNonNull(event.getData()).toBytes(), Constants.DEFAULT_CHARSET);
        if (content.length() > httpConfiguration.getEventMeshEventSize()) {
            BATCH_MESSAGE_LOGGER.error("Event size exceeds the limit: {}", httpConfiguration.getEventMeshEventSize());
            completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
                "Event size exceeds the limit: " + httpConfiguration.getEventMeshEventSize(),
                SendMessageBatchV2ResponseBody.class);
            return;
        }

        // do acl check
        if (httpConfiguration.isEventMeshServerSecurityEnable()) {
            String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            String user = getExtension(event, ProtocolKey.ClientInstanceKey.USERNAME.getKey());
            String pass = getExtension(event, ProtocolKey.ClientInstanceKey.PASSWD.getKey());
            String subsystem = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());
            try {
                this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
            } catch (Exception e) {
                completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
                    EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageBatchV2ResponseBody.class);
                ACL_LOGGER.warn("CLIENT HAS NO PERMISSION,BatchSendMessageV2Processor send failed", e);
                return;
            }
        }

        HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
        if (!eventMeshHTTPServer.getBatchRateLimiter()
            .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
            summaryMetrics.recordSendBatchMsgDiscard(1);
            completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
                EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }

        EventMeshProducer batchEventMeshProducer =
            eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
        batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
        if (!batchEventMeshProducer.isStarted()) {
            completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
                EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR, null, SendMessageBatchV2ResponseBody.class);
            return;
        }

        long batchStartTime = System.currentTimeMillis();

        String defaultTTL = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
        // todo: use hashmap to avoid copy
        String ttlValue = getExtension(event, SendMessageRequestBody.TTL);
        if (StringUtils.isBlank(ttlValue) && !StringUtils.isNumeric(ttlValue)) {
            event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, defaultTTL)
                .build();
        }

        try {
            event = CloudEventBuilder.from(event)
                .withExtension("msgtype", "persistent")
                .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP,
                    String.valueOf(System.currentTimeMillis()))
                .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP,
                    String.valueOf(System.currentTimeMillis()))
                .build();
            BATCH_MESSAGE_LOGGER.debug("msg2MQMsg suc, topic:{}, msg:{}", topic, event.getData());

        } catch (Exception e) {
            BATCH_MESSAGE_LOGGER.error("msg2MQMsg err, topic:{}, msg:{}", topic, event.getData(), e);
            completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader, EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR,
                EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg()
                    +
                    EventMeshUtil.stackTrace(e, 2),
                SendMessageBatchV2ResponseBody.class);
            return;
        }

        summaryMetrics.recordSendBatchMsg(1);

        final SendMessageContext sendMessageContext =
            new SendMessageContext(bizNo, event, batchEventMeshProducer, eventMeshHTTPServer);

        try {
            batchEventMeshProducer.send(sendMessageContext, new SendCallback() {

                @Override
                public void onSuccess(SendResult sendResult) {
                    long batchEndTime = System.currentTimeMillis();
                    summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
                    BATCH_MESSAGE_LOGGER.debug(
                        "batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
                        bizNo, batchEndTime - batchStartTime, topic);
                }

                @Override
                public void onException(OnExceptionContext context) {
                    long batchEndTime = System.currentTimeMillis();
                    eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
                    summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
                    BATCH_MESSAGE_LOGGER.error(
                        "batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
                        bizNo, batchEndTime - batchStartTime, topic, context.getException());
                }

            });
        } catch (Exception e) {
            completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader, EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR,
                EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR.getErrMsg()
                    +
                    EventMeshUtil.stackTrace(e, 2),
                SendMessageBatchV2ResponseBody.class);
            long batchEndTime = System.currentTimeMillis();
            eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
            summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
            BATCH_MESSAGE_LOGGER.error(
                "batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
                bizNo, batchEndTime - batchStartTime, topic, e);
        }

        completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
            EventMeshRetCode.SUCCESS, null, SendMessageBatchV2ResponseBody.class);
    }