public void handler()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java [81:315]


    public void handler(final HandlerService.HandlerSpecific handlerSpecific, final HttpRequest httpRequest) throws Exception {

        final AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();

        final ChannelHandlerContext ctx = handlerSpecific.getCtx();

        final HttpEventWrapper requestWrapper = asyncContext.getRequest();

        final String localAddress = IPUtils.getLocalAddress();
        log.info("uri={}|{}|client2eventMesh|from={}|to={}",
            requestWrapper.getRequestURI(), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);

        // user request header
        final Map<String, Object> requestHeaderMap = requestWrapper.getHeaderMap();
        final String source = RemotingHelper.parseChannelRemoteAddr(ctx.channel());

        final String env = eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv();
        final String meshGroup = new StringBuilder()
            .append(env)
            .append('-')
            .append(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC())
            .append('-')
            .append(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster())
            .append('-')
            .append(eventMeshHTTPServer.getEventMeshHttpConfiguration().getSysID())
            .toString();
        requestHeaderMap.put(ProtocolKey.ClientInstanceKey.IP.getKey(), source);
        requestHeaderMap.put(ProtocolKey.ClientInstanceKey.ENV.getKey(),
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
        requestHeaderMap.put(ProtocolKey.ClientInstanceKey.IDC.getKey(),
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
        requestHeaderMap.put(ProtocolKey.ClientInstanceKey.SYS.getKey(),
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getSysID());
        requestHeaderMap.put(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey(), meshGroup);

        // build sys header
        requestWrapper.buildSysHeaderForClient();

        // build cloudevents attributes
        requestHeaderMap.putIfAbsent("source", source);
        requestWrapper.buildSysHeaderForCE();

        // process remote event body
        final Map<String, Object> bodyMap = Optional.ofNullable(JsonUtils.parseTypeReferenceObject(
            new String(requestWrapper.getBody(), Constants.DEFAULT_CHARSET),
            new TypeReference<Map<String, Object>>() {
            }

        )).orElseGet(Maps::newHashMap);

        requestWrapper.setBody(bodyMap.get("content").toString().getBytes(StandardCharsets.UTF_8));

        final String bizNo = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey(),
            RandomStringUtils.generateNum(30)).toString();
        final String uniqueId = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey(),
            RandomStringUtils.generateNum(30)).toString();
        final String ttl = requestHeaderMap.getOrDefault(Constants.EVENTMESH_MESSAGE_CONST_TTL,
            4 * 1000).toString();

        requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey(), bizNo);
        requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey(), uniqueId);
        requestWrapper.getSysHeaderMap().putIfAbsent(Constants.EVENTMESH_MESSAGE_CONST_TTL, ttl);

        final Map<String, Object> responseHeaderMap = new HashMap<>();
        responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
        responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
        responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, localAddress);
        responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
        responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
            eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());

        final Map<String, Object> responseBodyMap = new HashMap<>();
        final Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
        final Iterator<Map.Entry<String, Object>> it = requestHeaderMap.entrySet().iterator();
        while (it.hasNext()) {
            final String key = it.next().getKey();
            if (sysHeaderMap.containsKey(key)) {
                it.remove();
            }
        }

        final String protocolType = requestHeaderMap.getOrDefault(ProtocolKey.PROTOCOL_TYPE, "http").toString();

        final ProtocolAdaptor<ProtocolTransportObject> httpProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
        CloudEvent event = httpProtocolAdaptor.toCloudEvent(requestWrapper);

        // validate event
        if (event == null
            || StringUtils.isBlank(event.getId())
            || event.getSource() == null
            || event.getSpecVersion() == null
            || StringUtils.isBlank(event.getType())
            || StringUtils.isBlank(event.getSubject())) {

            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));

            return;
        }

        final String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
        final String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());

        // validate event-extension
        if (StringUtils.isBlank(getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey()))
            || StringUtils.isBlank(pid)
            || !StringUtils.isNumeric(pid)
            || StringUtils.isBlank(sys)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final String producerGroup = getExtension(event, ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey());
        final String topic = event.getSubject();

        // validate body
        if (StringUtils.isBlank(bizNo)
            || StringUtils.isBlank(uniqueId)
            || StringUtils.isBlank(producerGroup)
            || StringUtils.isBlank(topic)
            || event.getData() == null) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        // do acl check
        if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
            try {
                this.acl.doAclCheckInHttpSend(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                    getExtension(event, ProtocolKey.ClientInstanceKey.USERNAME.getKey()),
                    getExtension(event, ProtocolKey.ClientInstanceKey.PASSWD.getKey()),
                    getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey()),
                    topic,
                    requestWrapper.getRequestURI());
            } catch (Exception e) {
                handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
                    responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));

                log.error("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
                return;
            }
        }

        // control flow rate limit
        if (!eventMeshHTTPServer.getMsgRateLimiter()
            .tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);

        if (!eventMeshProducer.isStarted()) {
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final String content = event.getData() == null ? "" : new String(event.getData().toBytes(), StandardCharsets.UTF_8);
        if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEventSize()) {
            log.error("Event size exceeds the limit: {}", eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEventSize());
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        try {
            event = CloudEventBuilder.from(event)
                .withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
                .withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .build();

            log.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
        } catch (Exception e) {
            log.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, responseHeaderMap,
                responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
            return;
        }

        final SendMessageContext sendMessageContext = new SendMessageContext(bizNo, event, eventMeshProducer, eventMeshHTTPServer);
        eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsg();

        final long startTime = System.currentTimeMillis();

        try {
            event = CloudEventBuilder.from(sendMessageContext.getEvent())
                .withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
                .build();
            handlerSpecific.getTraceOperation().createClientTraceOperation(EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event),
                EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false);

            eventMeshProducer.send(sendMessageContext, new SendCallback() {

                @Override
                public void onSuccess(final SendResult sendResult) {
                    responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
                    responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);

                    log.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                        System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
                    handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
                    handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
                }

                @Override
                public void onException(final OnExceptionContext context) {
                    responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
                    responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
                        + EventMeshUtil.stackTrace(context.getException(), 2));
                    eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
                    handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
                        EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), sendMessageContext.getEvent()));

                    handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);

                    log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                        System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, context.getException());
                }
            });
        } catch (Exception ex) {
            eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
            handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, responseHeaderMap,
                responseBodyMap, null);

            log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
                System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, ex);
        }
    }