public void handler()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java [80:307]


    public void handler(final HandlerService.HandlerSpecific handlerSpecific, final HttpRequest httpRequest) throws Exception {

        final AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();
        final ChannelHandlerContext ctx = handlerSpecific.getCtx();
        final HttpEventWrapper requestWrapper = asyncContext.getRequest();

        final String localAddress = IPUtils.getLocalAddress();

        log.info("uri={}|{}|client2eventMesh|from={}|to={}",
            requestWrapper.getRequestURI(), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);

        // user request header
        final Map<String, Object> requestHeaderMap = requestWrapper.getHeaderMap();
        final String source = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
        requestHeaderMap.put(ProtocolKey.ClientInstanceKey.IP.getKey(), source);

        // build sys header
        requestWrapper.buildSysHeaderForClient();

        // build cloudevents attributes
        requestHeaderMap.putIfAbsent("source", source);
        requestWrapper.buildSysHeaderForCE();

        final String bizNo = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey(),
            RandomStringUtils.generateNum(32)).toString();
        final String uniqueId = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey(),
            RandomStringUtils.generateNum(32)).toString();
        final String ttl = requestHeaderMap.getOrDefault(Constants.EVENTMESH_MESSAGE_CONST_TTL,
            14400000).toString();

        requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey(), bizNo);
        requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey(), uniqueId);
        requestWrapper.getSysHeaderMap().putIfAbsent(Constants.EVENTMESH_MESSAGE_CONST_TTL, ttl);

        final Map<String, Object> responseHeaderMap = new HashMap<>();
        responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
        responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
        responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP,
            localAddress);
        responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
        responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());

        final Map<String, Object> responseBodyMap = new HashMap<>();

        final String protocolType = requestHeaderMap.getOrDefault(ProtocolKey.PROTOCOL_TYPE,
            "http").toString();

        final ProtocolAdaptor<ProtocolTransportObject> httpProtocolAdaptor =
            ProtocolPluginFactory.getProtocolAdaptor(protocolType);

        CloudEvent event = httpProtocolAdaptor.toCloudEvent(requestWrapper);

        // validate event
        if (event == null
            || event.getSource() == null
            || event.getSpecVersion() == null
            || StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {

            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));

            return;
        }

        final String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC.getKey())).toString();
        final String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID.getKey())).toString();
        final String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS.getKey())).toString();

        // validate event-extension

        if (StringUtils.isAnyBlank(idc, pid, sys)
            || !StringUtils.isNumeric(pid)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final String producerGroup = Objects.requireNonNull(
            event.getExtension(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey())).toString();
        final String topic = event.getSubject();

        Pattern filterPattern = eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + topic);
        Transformer transformer = eventMeshHTTPServer.getTransformerEngine().getTransformer(producerGroup + "-" + topic);

        // validate body
        if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
            || event.getData() == null) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final String token = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.TOKEN.getKey())).toString();
        // do acl check
        if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
            final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
            final String requestURI = requestWrapper.getRequestURI();
            String subsystem = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS.getKey())).toString();
            try {
                EventMeshServicePubTopicInfo eventMeshServicePubTopicInfo = eventMeshHTTPServer.getEventMeshServer()
                    .getProducerTopicManager().getEventMeshServicePubTopicInfo(producerGroup);
                if (eventMeshServicePubTopicInfo == null) {
                    throw new AclException("no group register");
                }
                this.acl.doAclCheckInHttpSend(remoteAddr, token, subsystem, topic, requestURI, eventMeshServicePubTopicInfo);
            } catch (Exception e) {
                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
                    responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
                log.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
                return;
            }
        }

        // control flow rate limit
        if (!eventMeshHTTPServer.getMsgRateLimiter()
            .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final EventMeshProducer eventMeshProducer;
        if (StringUtils.isNotBlank(token)) {
            eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup, token);
        } else {
            eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
        }

        if (!eventMeshProducer.isStarted()) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final String content = new String(Objects.requireNonNull(event.getData()).toBytes(), StandardCharsets.UTF_8);
        if (Objects.requireNonNull(content).length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEventSize()) {
            log.error("Event size exceeds the limit: {}", eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEventSize());
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        try {
            event = CloudEventBuilder.from(event)
                .withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
                .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .build();

            log.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
        } catch (Exception e) {
            log.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final SendMessageContext sendMessageContext = new SendMessageContext(bizNo, event, eventMeshProducer, eventMeshHTTPServer);
        eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsg();

        final long startTime = System.currentTimeMillis();
        boolean isFiltered = true;
        try {
            event = CloudEventBuilder.from(sendMessageContext.getEvent())
                .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .build();
            handlerSpecific.getTraceOperation().createClientTraceOperation(EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event),
                EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false);
            if (filterPattern != null) {
                isFiltered = filterPattern.filter(JsonUtils.toJSONString(event));
            }

            // apply transformer
            if (isFiltered && transformer != null) {
                String data = transformer.transform(JsonUtils.toJSONString(event));
                event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
                    .getBytes(StandardCharsets.UTF_8)).build();
                sendMessageContext.setEvent(event);
            }

            if (isFiltered) {
                eventMeshProducer.send(sendMessageContext, new SendCallback() {

                    @Override
                    public void onSuccess(final SendResult sendResult) {
                        responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
                        responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);

                        log.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                            System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
                        handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
                        handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
                    }

                    @Override
                    public void onException(final OnExceptionContext context) {
                        responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
                        responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
                            + EventMeshUtil.stackTrace(context.getException(), 2));
                        eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
                        handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
                            EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), sendMessageContext.getEvent()));

                        handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
                        log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                            System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, context.getException());
                    }
                });
            } else {
                log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}|apply filter failed",
                    System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
                handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_FILTER_MSG_ERR, responseHeaderMap, responseBodyMap,
                    EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            }

        } catch (Exception ex) {
            eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, responseHeaderMap, responseBodyMap, null);

            final long endTime = System.currentTimeMillis();
            log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                endTime - startTime, topic, bizNo, uniqueId, ex);
        }
    }