public void processRequest()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java [86:290]


    public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {

        String localAddress = IPUtils.getLocalAddress();
        HttpCommand request = asyncContext.getRequest();
        CMD_LOGGER.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(request.getRequestCode())),
            EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);

        SendMessageBatchRequestHeader sendMessageBatchRequestHeader = (SendMessageBatchRequestHeader) request.getHeader();

        EventMeshHTTPConfiguration httpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
        SendMessageBatchResponseHeader sendMessageBatchResponseHeader = SendMessageBatchResponseHeader.buildHeader(
            Integer.valueOf(request.getRequestCode()), httpConfiguration.getEventMeshCluster(), localAddress, httpConfiguration.getEventMeshEnv(),
            httpConfiguration.getEventMeshIDC());

        String protocolType = sendMessageBatchRequestHeader.getProtocolType();
        ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
        List<CloudEvent> eventList = httpCommandProtocolAdaptor.toBatchCloudEvent(request);

        if (CollectionUtils.isEmpty(eventList)) {
            completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
            return;
        }

        String batchId = "";
        String producerGroup = "";
        int eventSize = eventList.size();

        if (eventSize > httpConfiguration.getEventMeshEventBatchSize()) {
            BATCH_MSG_LOGGER.error("Event batch size exceeds the limit: {}", httpConfiguration.getEventMeshEventBatchSize());
            completeResponse(request, asyncContext, sendMessageBatchResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
                "Event batch size exceeds the limit: " + httpConfiguration.getEventMeshEventBatchSize(), SendMessageBatchResponseBody.class);
            return;
        }

        for (CloudEvent event : eventList) {
            // validate event
            if (!ObjectUtils.allNotNull(event.getSource(), event.getSpecVersion())
                || StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
                completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
                    EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchResponseBody.class);
                return;
            }

            String content = event.getData() == null ? "" : new String(event.getData().toBytes(), Constants.DEFAULT_CHARSET);
            if (content.length() > httpConfiguration.getEventMeshEventSize()) {
                BATCH_MSG_LOGGER.error("Event size exceeds the limit: {}", httpConfiguration.getEventMeshEventSize());
                completeResponse(request, asyncContext, sendMessageBatchResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR,
                    "Event size exceeds the limit: " + httpConfiguration.getEventMeshEventSize(), SendMessageBatchResponseBody.class);
                return;
            }

            String idc = getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey());
            String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
            String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());

            // validate event-extension
            if (StringUtils.isAnyBlank(idc, pid, sys) || !StringUtils.isNumeric(pid)) {
                completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
                    EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchResponseBody.class);
                return;
            }

            batchId = getExtension(event, SendMessageBatchRequestBody.BATCHID);
            producerGroup = getExtension(event, SendMessageBatchRequestBody.PRODUCERGROUP);
            eventSize = Integer.parseInt(getExtension(event, SendMessageBatchRequestBody.SIZE));
            CloudEventData eventData = event.getData();

            if (eventData == null || StringUtils.isAnyBlank(batchId, producerGroup) || eventSize != eventList.size()) {
                completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
                    EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
                return;
            }

        }

        HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
        if (!eventMeshHTTPServer.getBatchRateLimiter()
            .tryAcquire(eventSize, EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
            summaryMetrics.recordSendBatchMsgDiscard(eventSize);
            completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
                EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR, null, SendMessageBatchResponseBody.class);
            return;
        }

        EventMeshProducer batchEventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

        batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();

        if (!batchEventMeshProducer.isStarted()) {
            completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
                EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR, null, SendMessageBatchResponseBody.class);
            return;
        }

        final Stopwatch stopwatch = Stopwatch.createStarted();
        String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        int requestCode = Integer.parseInt(request.getRequestCode());

        Map<String, List<CloudEvent>> topicBatchMessageMappings = new ConcurrentHashMap<>();

        for (CloudEvent cloudEvent : eventList) {
            if (StringUtils.isBlank(cloudEvent.getSubject()) || cloudEvent.getData() == null) {
                continue;
            }

            String user = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.USERNAME.getKey());
            String pass = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.PASSWD.getKey());
            String subsystem = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.SYS.getKey());

            // do acl check
            if (httpConfiguration.isEventMeshServerSecurityEnable()) {
                try {
                    this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, cloudEvent.getSubject(), requestCode);
                } catch (Exception e) {
                    completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
                        EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageBatchResponseBody.class);
                    ACL_LOGGER.warn("CLIENT HAS NO PERMISSION,BatchSendMessageProcessor send failed", e);
                    return;
                }
            }

            try {
                String ttl = getExtension(cloudEvent, SendMessageRequestBody.TTL);

                if (StringUtils.isBlank(ttl) || !StringUtils.isNumeric(ttl)) {
                    cloudEvent = CloudEventBuilder.from(cloudEvent)
                        .withExtension(SendMessageRequestBody.TTL, String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS))
                        .withExtension("msgtype", "persistent")
                        .build();
                }

                if (topicBatchMessageMappings.containsKey(cloudEvent.getSubject())) {
                    topicBatchMessageMappings.get(cloudEvent.getSubject()).add(cloudEvent);
                } else {
                    List<CloudEvent> tmp = new ArrayList<>();
                    tmp.add(cloudEvent);
                    topicBatchMessageMappings.put(cloudEvent.getSubject(), tmp);
                }

                BATCH_MSG_LOGGER.debug("msg2MQMsg suc, event:{}", cloudEvent.getData());
            } catch (Exception e) {
                BATCH_MSG_LOGGER.error("msg2MQMsg err, event:{}", cloudEvent.getData(), e);
            }

        }

        if (CollectionUtils.isEmpty(eventList)) {
            completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
                EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
            return;
        }

        long delta = eventSize;
        summaryMetrics.recordSendBatchMsg(delta);

        if (httpConfiguration.isEventMeshServerBatchMsgBatchEnabled()) {
            for (List<CloudEvent> eventlist : topicBatchMessageMappings.values()) {
                // TODO: Implementation in API. Consider whether to put it in the plug-in.
                CloudEvent event = null;
                // TODO: Detect the maximum length of messages for different producers.
                final SendMessageContext sendMessageContext = new SendMessageContext(batchId, event, batchEventMeshProducer, eventMeshHTTPServer);
                batchEventMeshProducer.send(sendMessageContext, new SendCallback() {

                    @Override
                    public void onSuccess(SendResult sendResult) {
                    }

                    @Override
                    public void onException(OnExceptionContext context) {
                        BATCH_MSG_LOGGER.warn("", context.getException());
                        eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
                    }

                });
            }
        } else {
            for (CloudEvent event : eventList) {
                final SendMessageContext sendMessageContext = new SendMessageContext(batchId, event, batchEventMeshProducer, eventMeshHTTPServer);
                batchEventMeshProducer.send(sendMessageContext, new SendCallback() {

                    @Override
                    public void onSuccess(SendResult sendResult) {

                    }

                    @Override
                    public void onException(OnExceptionContext context) {
                        BATCH_MSG_LOGGER.warn("", context.getException());
                        eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
                    }

                });
            }
        }

        long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
        summaryMetrics.recordBatchSendMsgCost(elapsed);

        BATCH_MSG_LOGGER.debug("batchMessage|eventMesh2mq|REQ|ASYNC|batchId={}|send2MQCost={}ms|msgNum={}|topics={}",
            batchId, elapsed, eventSize, topicBatchMessageMappings.keySet());
        completeResponse(request, asyncContext, sendMessageBatchResponseHeader, EventMeshRetCode.SUCCESS, null,
            SendMessageBatchResponseBody.class);
        return;
    }