in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncEventProcessor.java [80:307]
public void handler(final HandlerService.HandlerSpecific handlerSpecific, final HttpRequest httpRequest) throws Exception {
final AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();
final ChannelHandlerContext ctx = handlerSpecific.getCtx();
final HttpEventWrapper requestWrapper = asyncContext.getRequest();
final String localAddress = IPUtils.getLocalAddress();
log.info("uri={}|{}|client2eventMesh|from={}|to={}",
requestWrapper.getRequestURI(), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
// user request header
final Map<String, Object> requestHeaderMap = requestWrapper.getHeaderMap();
final String source = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
requestHeaderMap.put(ProtocolKey.ClientInstanceKey.IP.getKey(), source);
// build sys header
requestWrapper.buildSysHeaderForClient();
// build cloudevents attributes
requestHeaderMap.putIfAbsent("source", source);
requestWrapper.buildSysHeaderForCE();
final String bizNo = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey(),
RandomStringUtils.generateNum(32)).toString();
final String uniqueId = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey(),
RandomStringUtils.generateNum(32)).toString();
final String ttl = requestHeaderMap.getOrDefault(Constants.EVENTMESH_MESSAGE_CONST_TTL,
14400000).toString();
requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey(), bizNo);
requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey(), uniqueId);
requestWrapper.getSysHeaderMap().putIfAbsent(Constants.EVENTMESH_MESSAGE_CONST_TTL, ttl);
final Map<String, Object> responseHeaderMap = new HashMap<>();
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP,
localAddress);
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
final Map<String, Object> responseBodyMap = new HashMap<>();
final String protocolType = requestHeaderMap.getOrDefault(ProtocolKey.PROTOCOL_TYPE,
"http").toString();
final ProtocolAdaptor<ProtocolTransportObject> httpProtocolAdaptor =
ProtocolPluginFactory.getProtocolAdaptor(protocolType);
CloudEvent event = httpProtocolAdaptor.toCloudEvent(requestWrapper);
// validate event
if (event == null
|| event.getSource() == null
|| event.getSpecVersion() == null
|| StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final String idc = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.IDC.getKey())).toString();
final String pid = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.PID.getKey())).toString();
final String sys = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS.getKey())).toString();
// validate event-extension
if (StringUtils.isAnyBlank(idc, pid, sys)
|| !StringUtils.isNumeric(pid)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final String producerGroup = Objects.requireNonNull(
event.getExtension(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey())).toString();
final String topic = event.getSubject();
Pattern filterPattern = eventMeshHTTPServer.getFilterEngine().getFilterPattern(producerGroup + "-" + topic);
Transformer transformer = eventMeshHTTPServer.getTransformerEngine().getTransformer(producerGroup + "-" + topic);
// validate body
if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic)
|| event.getData() == null) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final String token = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.TOKEN.getKey())).toString();
// do acl check
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
final String requestURI = requestWrapper.getRequestURI();
String subsystem = Objects.requireNonNull(event.getExtension(ProtocolKey.ClientInstanceKey.SYS.getKey())).toString();
try {
EventMeshServicePubTopicInfo eventMeshServicePubTopicInfo = eventMeshHTTPServer.getEventMeshServer()
.getProducerTopicManager().getEventMeshServicePubTopicInfo(producerGroup);
if (eventMeshServicePubTopicInfo == null) {
throw new AclException("no group register");
}
this.acl.doAclCheckInHttpSend(remoteAddr, token, subsystem, topic, requestURI, eventMeshServicePubTopicInfo);
} catch (Exception e) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
log.warn("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
return;
}
}
// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final EventMeshProducer eventMeshProducer;
if (StringUtils.isNotBlank(token)) {
eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup, token);
} else {
eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
}
if (!eventMeshProducer.isStarted()) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final String content = new String(Objects.requireNonNull(event.getData()).toBytes(), StandardCharsets.UTF_8);
if (Objects.requireNonNull(content).length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEventSize()) {
log.error("Event size exceeds the limit: {}", eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEventSize());
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
try {
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
log.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
} catch (Exception e) {
log.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final SendMessageContext sendMessageContext = new SendMessageContext(bizNo, event, eventMeshProducer, eventMeshHTTPServer);
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsg();
final long startTime = System.currentTimeMillis();
boolean isFiltered = true;
try {
event = CloudEventBuilder.from(sendMessageContext.getEvent())
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
handlerSpecific.getTraceOperation().createClientTraceOperation(EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false);
if (filterPattern != null) {
isFiltered = filterPattern.filter(JsonUtils.toJSONString(event));
}
// apply transformer
if (isFiltered && transformer != null) {
String data = transformer.transform(JsonUtils.toJSONString(event));
event = CloudEventBuilder.from(event).withData(Objects.requireNonNull(JsonUtils.toJSONString(data))
.getBytes(StandardCharsets.UTF_8)).build();
sendMessageContext.setEvent(event);
}
if (isFiltered) {
eventMeshProducer.send(sendMessageContext, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);
log.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
}
@Override
public void onException(final OnExceptionContext context) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(context.getException(), 2));
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), sendMessageContext.getEvent()));
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, context.getException());
}
});
} else {
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}|apply filter failed",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_FILTER_MSG_ERR, responseHeaderMap, responseBodyMap,
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
}
} catch (Exception ex) {
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, responseHeaderMap, responseBodyMap, null);
final long endTime = System.currentTimeMillis();
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, topic, bizNo, uniqueId, ex);
}
}