public void tryHTTPRequest()

in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java [93:286]


    public void tryHTTPRequest() {

        currPushUrl = getUrl();

        if (StringUtils.isBlank(currPushUrl)) {
            LOGGER.warn("tryHTTPRequest fail, getUrl is null, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", this.handleMsgContext.getConsumerGroup(),
                this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId());
            return;
        }

        HttpPost builder = new HttpPost(currPushUrl);

        String requestCode = "";
        if (SubscriptionType.SYNC == handleMsgContext.getSubscriptionItem().getType()) {
            requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
        } else {
            requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
        }
        String localAddress = IPUtils.getLocalAddress();
        builder.addHeader(ProtocolKey.REQUEST_CODE, requestCode);
        builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
        builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
            handleMsgContext.getEventMeshHTTPServer()
                .getEventMeshHttpConfiguration().getEventMeshCluster());
        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, localAddress);
        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
            handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshEnv());
        builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
            handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshIDC());

        CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent())
            .withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
                String.valueOf(System.currentTimeMillis()))
            .withExtension(EventMeshConstants.RSP_URL, currPushUrl)
            .withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
            .build();

        Pattern filterPattern = eventMeshHTTPServer.getFilterEngine()
            .getFilterPattern(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic());
        if (filterPattern != null) {
            if (!filterPattern.filter(JsonUtils.toJSONString(event))) {
                LOGGER.error("apply filter failed, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
                    this.handleMsgContext.getConsumerGroup(),
                    this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId());
                return;
            }
        }
        Transformer transformer = eventMeshHTTPServer.getTransformerEngine()
            .getTransformer(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic());
        if (transformer != null) {
            try {
                String data = transformer.transform(JsonUtils.toJSONString(event));
                event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
                    .getBytes(StandardCharsets.UTF_8)).build();
            } catch (Exception exception) {
                LOGGER.warn("apply transformer to cloudevents error, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
                    this.handleMsgContext.getConsumerGroup(),
                    this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), exception);
                return;
            }
        }
        handleMsgContext.setEvent(event);
        super.setEvent(event);

        String content = "";
        try {
            String protocolType = Objects.requireNonNull(event.getExtension(Constants.PROTOCOL_TYPE)).toString();

            ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);

            ProtocolTransportObject protocolTransportObject =
                protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
            if (protocolTransportObject instanceof HttpCommand) {
                content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
            } else {
                HttpEventWrapper httpEventWrapper = (HttpEventWrapper) protocolTransportObject;
                content = new String(httpEventWrapper.getBody(), Constants.DEFAULT_CHARSET);
                httpEventWrapper.getSysHeaderMap().forEach((k, v) -> {
                    if (!builder.containsHeader(k)) {
                        builder.addHeader(k, v.toString());
                    }
                });
            }

        } catch (Exception ex) {
            LOGGER.warn("cloudevent to HttpEventWrapper occur except, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
                this.handleMsgContext.getConsumerGroup(),
                this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), ex);
            return;
        }

        List<NameValuePair> body = new ArrayList<>();
        body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
        if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
            body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
                RandomStringUtils.generateNum(20)));
        } else {
            body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
                handleMsgContext.getBizSeqNo()));
        }
        if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
            body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
                RandomStringUtils.generateNum(20)));
        } else {
            body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
                handleMsgContext.getUniqueId()));
        }

        body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
            handleMsgContext.getMsgRandomNo()));
        body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));

        body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
            JsonUtils.toJSONString(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));

        HttpEntity httpEntity = new UrlEncodedFormEntity(body, Constants.DEFAULT_CHARSET);

        builder.setEntity(httpEntity);

        eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordPushMsg();

        this.lastPushTime = System.currentTimeMillis();

        addToWaitingMap(this);

        CMD_LOGGER.info("cmd={}|eventMesh2client|from={}|to={}", requestCode, localAddress, currPushUrl);

        try {
            eventMeshHTTPServer.getHttpClientPool().getClient().execute(builder, response -> {
                removeWaitingMap(AsyncHTTPPushRequest.this);
                long cost = System.currentTimeMillis() - lastPushTime;
                eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHTTPPushTimeCost(cost);

                if (processResponseStatus(response.getStatusLine().getStatusCode(), response)) {
                    // this is successful response, process response payload
                    String res;
                    try {
                        res = EntityUtils.toString(response.getEntity(), Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
                    } catch (IOException e) {
                        LOGGER.warn("handleResponse exception", e);
                        handleMsgContext.finish();
                        return new Object();
                    }
                    ClientRetCode result = processResponseContent(res);
                    MESSAGE_LOGGER.info("message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}",
                        result, currPushUrl, handleMsgContext.getTopic(),
                        handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
                    switch (result) {
                        case OK:
                        case REMOTE_OK:
                        case FAIL:
                            complete();
                            if (isComplete()) {
                                handleMsgContext.finish();
                            }
                            break;
                        case RETRY:
                        case NOLISTEN:
                            delayRetry();
                            if (isComplete()) {
                                handleMsgContext.finish();
                            }
                            break;
                        default: // do nothing
                    }
                } else {
                    eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHttpPushMsgFailed();
                    MESSAGE_LOGGER.info("message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}",
                        currPushUrl, handleMsgContext.getTopic(), handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);

                    if (isComplete()) {
                        handleMsgContext.finish();
                    }
                }
                return new Object();
            });

            if (MESSAGE_LOGGER.isDebugEnabled()) {
                MESSAGE_LOGGER.debug("message|eventMesh2client|url={}|topic={}|event={}",
                    currPushUrl, handleMsgContext.getTopic(), handleMsgContext.getEvent());
            } else {
                MESSAGE_LOGGER.info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
                    currPushUrl, handleMsgContext.getTopic(), handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
            }
        } catch (IOException e) {
            MESSAGE_LOGGER.error("push2client err", e);
            removeWaitingMap(this);
            delayRetry();
            if (isComplete()) {
                handleMsgContext.finish();
            }
        }
    }