in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendAsyncRemoteEventProcessor.java [81:315]
public void handler(final HandlerService.HandlerSpecific handlerSpecific, final HttpRequest httpRequest) throws Exception {
final AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();
final ChannelHandlerContext ctx = handlerSpecific.getCtx();
final HttpEventWrapper requestWrapper = asyncContext.getRequest();
final String localAddress = IPUtils.getLocalAddress();
log.info("uri={}|{}|client2eventMesh|from={}|to={}",
requestWrapper.getRequestURI(), EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
// user request header
final Map<String, Object> requestHeaderMap = requestWrapper.getHeaderMap();
final String source = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
final String env = eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv();
final String meshGroup = new StringBuilder()
.append(env)
.append('-')
.append(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC())
.append('-')
.append(eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster())
.append('-')
.append(eventMeshHTTPServer.getEventMeshHttpConfiguration().getSysID())
.toString();
requestHeaderMap.put(ProtocolKey.ClientInstanceKey.IP.getKey(), source);
requestHeaderMap.put(ProtocolKey.ClientInstanceKey.ENV.getKey(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
requestHeaderMap.put(ProtocolKey.ClientInstanceKey.IDC.getKey(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
requestHeaderMap.put(ProtocolKey.ClientInstanceKey.SYS.getKey(),
eventMeshHTTPServer.getEventMeshHttpConfiguration().getSysID());
requestHeaderMap.put(ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey(), meshGroup);
// build sys header
requestWrapper.buildSysHeaderForClient();
// build cloudevents attributes
requestHeaderMap.putIfAbsent("source", source);
requestWrapper.buildSysHeaderForCE();
// process remote event body
final Map<String, Object> bodyMap = Optional.ofNullable(JsonUtils.parseTypeReferenceObject(
new String(requestWrapper.getBody(), Constants.DEFAULT_CHARSET),
new TypeReference<Map<String, Object>>() {
}
)).orElseGet(Maps::newHashMap);
requestWrapper.setBody(bodyMap.get("content").toString().getBytes(StandardCharsets.UTF_8));
final String bizNo = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey(),
RandomStringUtils.generateNum(30)).toString();
final String uniqueId = requestHeaderMap.getOrDefault(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey(),
RandomStringUtils.generateNum(30)).toString();
final String ttl = requestHeaderMap.getOrDefault(Constants.EVENTMESH_MESSAGE_CONST_TTL,
4 * 1000).toString();
requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.BIZSEQNO.getKey(), bizNo);
requestWrapper.getSysHeaderMap().putIfAbsent(ProtocolKey.ClientInstanceKey.UNIQUEID.getKey(), uniqueId);
requestWrapper.getSysHeaderMap().putIfAbsent(Constants.EVENTMESH_MESSAGE_CONST_TTL, ttl);
final Map<String, Object> responseHeaderMap = new HashMap<>();
responseHeaderMap.put(ProtocolKey.REQUEST_URI, requestWrapper.getRequestURI());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHCLUSTER,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshCluster());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIP, localAddress);
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHENV,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEnv());
responseHeaderMap.put(ProtocolKey.EventMeshInstanceKey.EVENTMESHIDC,
eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshIDC());
final Map<String, Object> responseBodyMap = new HashMap<>();
final Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
final Iterator<Map.Entry<String, Object>> it = requestHeaderMap.entrySet().iterator();
while (it.hasNext()) {
final String key = it.next().getKey();
if (sysHeaderMap.containsKey(key)) {
it.remove();
}
}
final String protocolType = requestHeaderMap.getOrDefault(ProtocolKey.PROTOCOL_TYPE, "http").toString();
final ProtocolAdaptor<ProtocolTransportObject> httpProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
CloudEvent event = httpProtocolAdaptor.toCloudEvent(requestWrapper);
// validate event
if (event == null
|| StringUtils.isBlank(event.getId())
|| event.getSource() == null
|| event.getSpecVersion() == null
|| StringUtils.isBlank(event.getType())
|| StringUtils.isBlank(event.getSubject())) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
final String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());
// validate event-extension
if (StringUtils.isBlank(getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey()))
|| StringUtils.isBlank(pid)
|| !StringUtils.isNumeric(pid)
|| StringUtils.isBlank(sys)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final String producerGroup = getExtension(event, ProtocolKey.ClientInstanceKey.PRODUCERGROUP.getKey());
final String topic = event.getSubject();
// validate body
if (StringUtils.isBlank(bizNo)
|| StringUtils.isBlank(uniqueId)
|| StringUtils.isBlank(producerGroup)
|| StringUtils.isBlank(topic)
|| event.getData() == null) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
// do acl check
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
try {
this.acl.doAclCheckInHttpSend(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
getExtension(event, ProtocolKey.ClientInstanceKey.USERNAME.getKey()),
getExtension(event, ProtocolKey.ClientInstanceKey.PASSWD.getKey()),
getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey()),
topic,
requestWrapper.getRequestURI());
} catch (Exception e) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_ACL_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
log.error("CLIENT HAS NO PERMISSION,SendAsyncMessageProcessor send failed", e);
return;
}
}
// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final EventMeshProducer eventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.isStarted()) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final String content = event.getData() == null ? "" : new String(event.getData().toBytes(), StandardCharsets.UTF_8);
if (content.length() > eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEventSize()) {
log.error("Event size exceeds the limit: {}", eventMeshHTTPServer.getEventMeshHttpConfiguration().getEventMeshEventSize());
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_SIZE_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
try {
event = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
log.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
} catch (Exception e) {
log.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, responseHeaderMap,
responseBodyMap, EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event));
return;
}
final SendMessageContext sendMessageContext = new SendMessageContext(bizNo, event, eventMeshProducer, eventMeshHTTPServer);
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsg();
final long startTime = System.currentTimeMillis();
try {
event = CloudEventBuilder.from(sendMessageContext.getEvent())
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
handlerSpecific.getTraceOperation().createClientTraceOperation(EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), event),
EventMeshTraceConstants.TRACE_UPSTREAM_EVENTMESH_CLIENT_SPAN, false);
eventMeshProducer.send(sendMessageContext, new SendCallback() {
@Override
public void onSuccess(final SendResult sendResult) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg() + sendResult);
log.info("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
handlerSpecific.getTraceOperation().endLatestTrace(sendMessageContext.getEvent());
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
}
@Override
public void onException(final OnExceptionContext context) {
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(context.getException(), 2));
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
handlerSpecific.getTraceOperation().exceptionLatestTrace(context.getException(),
EventMeshUtil.getCloudEventExtensionMap(SpecVersion.V1.toString(), sendMessageContext.getEvent()));
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, context.getException());
}
});
} catch (Exception ex) {
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_SEND_ASYNC_MSG_ERR, responseHeaderMap,
responseBodyMap, null);
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, ex);
}
}