in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/push/AsyncHTTPPushRequest.java [93:286]
public void tryHTTPRequest() {
currPushUrl = getUrl();
if (StringUtils.isBlank(currPushUrl)) {
LOGGER.warn("tryHTTPRequest fail, getUrl is null, group:{}, topic:{}, bizSeqNo={}, uniqueId={}", this.handleMsgContext.getConsumerGroup(),
this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId());
return;
}
HttpPost builder = new HttpPost(currPushUrl);
String requestCode = "";
if (SubscriptionType.SYNC == handleMsgContext.getSubscriptionItem().getType()) {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_SYNC.getRequestCode());
} else {
requestCode = String.valueOf(RequestCode.HTTP_PUSH_CLIENT_ASYNC.getRequestCode());
}
String localAddress = IPUtils.getLocalAddress();
builder.addHeader(ProtocolKey.REQUEST_CODE, requestCode);
builder.addHeader(ProtocolKey.LANGUAGE, Constants.LANGUAGE_JAVA);
builder.addHeader(ProtocolKey.VERSION, ProtocolVersion.V1.getVersion());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
handleMsgContext.getEventMeshHTTPServer()
.getEventMeshHttpConfiguration().getEventMeshCluster());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, localAddress);
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshEnv());
builder.addHeader(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
handleMsgContext.getEventMeshHTTPServer().getEventMeshHttpConfiguration().getEventMeshIDC());
CloudEvent event = CloudEventBuilder.from(handleMsgContext.getEvent())
.withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.RSP_URL, currPushUrl)
.withExtension(EventMeshConstants.RSP_GROUP, handleMsgContext.getConsumerGroup())
.build();
Pattern filterPattern = eventMeshHTTPServer.getFilterEngine()
.getFilterPattern(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic());
if (filterPattern != null) {
if (!filterPattern.filter(JsonUtils.toJSONString(event))) {
LOGGER.error("apply filter failed, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
this.handleMsgContext.getConsumerGroup(),
this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId());
return;
}
}
Transformer transformer = eventMeshHTTPServer.getTransformerEngine()
.getTransformer(handleMsgContext.getConsumerGroup() + "-" + handleMsgContext.getTopic());
if (transformer != null) {
try {
String data = transformer.transform(JsonUtils.toJSONString(event));
event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
.getBytes(StandardCharsets.UTF_8)).build();
} catch (Exception exception) {
LOGGER.warn("apply transformer to cloudevents error, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
this.handleMsgContext.getConsumerGroup(),
this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), exception);
return;
}
}
handleMsgContext.setEvent(event);
super.setEvent(event);
String content = "";
try {
String protocolType = Objects.requireNonNull(event.getExtension(Constants.PROTOCOL_TYPE)).toString();
ProtocolAdaptor<ProtocolTransportObject> protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
ProtocolTransportObject protocolTransportObject =
protocolAdaptor.fromCloudEvent(handleMsgContext.getEvent());
if (protocolTransportObject instanceof HttpCommand) {
content = ((HttpCommand) protocolTransportObject).getBody().toMap().get("content").toString();
} else {
HttpEventWrapper httpEventWrapper = (HttpEventWrapper) protocolTransportObject;
content = new String(httpEventWrapper.getBody(), Constants.DEFAULT_CHARSET);
httpEventWrapper.getSysHeaderMap().forEach((k, v) -> {
if (!builder.containsHeader(k)) {
builder.addHeader(k, v.toString());
}
});
}
} catch (Exception ex) {
LOGGER.warn("cloudevent to HttpEventWrapper occur except, group:{}, topic:{}, bizSeqNo={}, uniqueId={}",
this.handleMsgContext.getConsumerGroup(),
this.handleMsgContext.getTopic(), this.handleMsgContext.getBizSeqNo(), this.handleMsgContext.getUniqueId(), ex);
return;
}
List<NameValuePair> body = new ArrayList<>();
body.add(new BasicNameValuePair(PushMessageRequestBody.CONTENT, content));
if (StringUtils.isBlank(handleMsgContext.getBizSeqNo())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.BIZSEQNO,
handleMsgContext.getBizSeqNo()));
}
if (StringUtils.isBlank(handleMsgContext.getUniqueId())) {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
RandomStringUtils.generateNum(20)));
} else {
body.add(new BasicNameValuePair(PushMessageRequestBody.UNIQUEID,
handleMsgContext.getUniqueId()));
}
body.add(new BasicNameValuePair(PushMessageRequestBody.RANDOMNO,
handleMsgContext.getMsgRandomNo()));
body.add(new BasicNameValuePair(PushMessageRequestBody.TOPIC, handleMsgContext.getTopic()));
body.add(new BasicNameValuePair(PushMessageRequestBody.EXTFIELDS,
JsonUtils.toJSONString(EventMeshUtil.getEventProp(handleMsgContext.getEvent()))));
HttpEntity httpEntity = new UrlEncodedFormEntity(body, Constants.DEFAULT_CHARSET);
builder.setEntity(httpEntity);
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordPushMsg();
this.lastPushTime = System.currentTimeMillis();
addToWaitingMap(this);
CMD_LOGGER.info("cmd={}|eventMesh2client|from={}|to={}", requestCode, localAddress, currPushUrl);
try {
eventMeshHTTPServer.getHttpClientPool().getClient().execute(builder, response -> {
removeWaitingMap(AsyncHTTPPushRequest.this);
long cost = System.currentTimeMillis() - lastPushTime;
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHTTPPushTimeCost(cost);
if (processResponseStatus(response.getStatusLine().getStatusCode(), response)) {
// this is successful response, process response payload
String res;
try {
res = EntityUtils.toString(response.getEntity(), Charset.forName(EventMeshConstants.DEFAULT_CHARSET));
} catch (IOException e) {
LOGGER.warn("handleResponse exception", e);
handleMsgContext.finish();
return new Object();
}
ClientRetCode result = processResponseContent(res);
MESSAGE_LOGGER.info("message|eventMesh2client|{}|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}",
result, currPushUrl, handleMsgContext.getTopic(),
handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
switch (result) {
case OK:
case REMOTE_OK:
case FAIL:
complete();
if (isComplete()) {
handleMsgContext.finish();
}
break;
case RETRY:
case NOLISTEN:
delayRetry();
if (isComplete()) {
handleMsgContext.finish();
}
break;
default: // do nothing
}
} else {
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHttpPushMsgFailed();
MESSAGE_LOGGER.info("message|eventMesh2client|exception|url={}|topic={}|bizSeqNo={}|uniqueId={}|cost={}",
currPushUrl, handleMsgContext.getTopic(), handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId(), cost);
if (isComplete()) {
handleMsgContext.finish();
}
}
return new Object();
});
if (MESSAGE_LOGGER.isDebugEnabled()) {
MESSAGE_LOGGER.debug("message|eventMesh2client|url={}|topic={}|event={}",
currPushUrl, handleMsgContext.getTopic(), handleMsgContext.getEvent());
} else {
MESSAGE_LOGGER.info("message|eventMesh2client|url={}|topic={}|bizSeqNo={}|uniqueId={}",
currPushUrl, handleMsgContext.getTopic(), handleMsgContext.getBizSeqNo(), handleMsgContext.getUniqueId());
}
} catch (IOException e) {
MESSAGE_LOGGER.error("push2client err", e);
removeWaitingMap(this);
delayRetry();
if (isComplete()) {
handleMsgContext.finish();
}
}
}