in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageProcessor.java [86:290]
public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext) throws Exception {
String localAddress = IPUtils.getLocalAddress();
HttpCommand request = asyncContext.getRequest();
CMD_LOGGER.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(request.getRequestCode())),
EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
SendMessageBatchRequestHeader sendMessageBatchRequestHeader = (SendMessageBatchRequestHeader) request.getHeader();
EventMeshHTTPConfiguration httpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
SendMessageBatchResponseHeader sendMessageBatchResponseHeader = SendMessageBatchResponseHeader.buildHeader(
Integer.valueOf(request.getRequestCode()), httpConfiguration.getEventMeshCluster(), localAddress, httpConfiguration.getEventMeshEnv(),
httpConfiguration.getEventMeshIDC());
String protocolType = sendMessageBatchRequestHeader.getProtocolType();
ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
List<CloudEvent> eventList = httpCommandProtocolAdaptor.toBatchCloudEvent(request);
if (CollectionUtils.isEmpty(eventList)) {
completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
return;
}
String batchId = "";
String producerGroup = "";
int eventSize = eventList.size();
if (eventSize > httpConfiguration.getEventMeshEventBatchSize()) {
BATCH_MSG_LOGGER.error("Event batch size exceeds the limit: {}", httpConfiguration.getEventMeshEventBatchSize());
completeResponse(request, asyncContext, sendMessageBatchResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
"Event batch size exceeds the limit: " + httpConfiguration.getEventMeshEventBatchSize(), SendMessageBatchResponseBody.class);
return;
}
for (CloudEvent event : eventList) {
// validate event
if (!ObjectUtils.allNotNull(event.getSource(), event.getSpecVersion())
|| StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchResponseBody.class);
return;
}
String content = event.getData() == null ? "" : new String(event.getData().toBytes(), Constants.DEFAULT_CHARSET);
if (content.length() > httpConfiguration.getEventMeshEventSize()) {
BATCH_MSG_LOGGER.error("Event size exceeds the limit: {}", httpConfiguration.getEventMeshEventSize());
completeResponse(request, asyncContext, sendMessageBatchResponseHeader, EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR,
"Event size exceeds the limit: " + httpConfiguration.getEventMeshEventSize(), SendMessageBatchResponseBody.class);
return;
}
String idc = getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey());
String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());
// validate event-extension
if (StringUtils.isAnyBlank(idc, pid, sys) || !StringUtils.isNumeric(pid)) {
completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchResponseBody.class);
return;
}
batchId = getExtension(event, SendMessageBatchRequestBody.BATCHID);
producerGroup = getExtension(event, SendMessageBatchRequestBody.PRODUCERGROUP);
eventSize = Integer.parseInt(getExtension(event, SendMessageBatchRequestBody.SIZE));
CloudEventData eventData = event.getData();
if (eventData == null || StringUtils.isAnyBlank(batchId, producerGroup) || eventSize != eventList.size()) {
completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
return;
}
}
HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
if (!eventMeshHTTPServer.getBatchRateLimiter()
.tryAcquire(eventSize, EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
summaryMetrics.recordSendBatchMsgDiscard(eventSize);
completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR, null, SendMessageBatchResponseBody.class);
return;
}
EventMeshProducer batchEventMeshProducer = eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
if (!batchEventMeshProducer.isStarted()) {
completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR, null, SendMessageBatchResponseBody.class);
return;
}
final Stopwatch stopwatch = Stopwatch.createStarted();
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
int requestCode = Integer.parseInt(request.getRequestCode());
Map<String, List<CloudEvent>> topicBatchMessageMappings = new ConcurrentHashMap<>();
for (CloudEvent cloudEvent : eventList) {
if (StringUtils.isBlank(cloudEvent.getSubject()) || cloudEvent.getData() == null) {
continue;
}
String user = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.USERNAME.getKey());
String pass = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.PASSWD.getKey());
String subsystem = getExtension(cloudEvent, ProtocolKey.ClientInstanceKey.SYS.getKey());
// do acl check
if (httpConfiguration.isEventMeshServerSecurityEnable()) {
try {
this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, cloudEvent.getSubject(), requestCode);
} catch (Exception e) {
completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageBatchResponseBody.class);
ACL_LOGGER.warn("CLIENT HAS NO PERMISSION,BatchSendMessageProcessor send failed", e);
return;
}
}
try {
String ttl = getExtension(cloudEvent, SendMessageRequestBody.TTL);
if (StringUtils.isBlank(ttl) || !StringUtils.isNumeric(ttl)) {
cloudEvent = CloudEventBuilder.from(cloudEvent)
.withExtension(SendMessageRequestBody.TTL, String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS))
.withExtension("msgtype", "persistent")
.build();
}
if (topicBatchMessageMappings.containsKey(cloudEvent.getSubject())) {
topicBatchMessageMappings.get(cloudEvent.getSubject()).add(cloudEvent);
} else {
List<CloudEvent> tmp = new ArrayList<>();
tmp.add(cloudEvent);
topicBatchMessageMappings.put(cloudEvent.getSubject(), tmp);
}
BATCH_MSG_LOGGER.debug("msg2MQMsg suc, event:{}", cloudEvent.getData());
} catch (Exception e) {
BATCH_MSG_LOGGER.error("msg2MQMsg err, event:{}", cloudEvent.getData(), e);
}
}
if (CollectionUtils.isEmpty(eventList)) {
completeResponse(request, asyncContext, sendMessageBatchResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchResponseBody.class);
return;
}
long delta = eventSize;
summaryMetrics.recordSendBatchMsg(delta);
if (httpConfiguration.isEventMeshServerBatchMsgBatchEnabled()) {
for (List<CloudEvent> eventlist : topicBatchMessageMappings.values()) {
// TODO: Implementation in API. Consider whether to put it in the plug-in.
CloudEvent event = null;
// TODO: Detect the maximum length of messages for different producers.
final SendMessageContext sendMessageContext = new SendMessageContext(batchId, event, batchEventMeshProducer, eventMeshHTTPServer);
batchEventMeshProducer.send(sendMessageContext, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(OnExceptionContext context) {
BATCH_MSG_LOGGER.warn("", context.getException());
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
}
});
}
} else {
for (CloudEvent event : eventList) {
final SendMessageContext sendMessageContext = new SendMessageContext(batchId, event, batchEventMeshProducer, eventMeshHTTPServer);
batchEventMeshProducer.send(sendMessageContext, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
}
@Override
public void onException(OnExceptionContext context) {
BATCH_MSG_LOGGER.warn("", context.getException());
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
}
});
}
}
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
summaryMetrics.recordBatchSendMsgCost(elapsed);
BATCH_MSG_LOGGER.debug("batchMessage|eventMesh2mq|REQ|ASYNC|batchId={}|send2MQCost={}ms|msgNum={}|topics={}",
batchId, elapsed, eventSize, topicBatchMessageMappings.keySet());
completeResponse(request, asyncContext, sendMessageBatchResponseHeader, EventMeshRetCode.SUCCESS, null,
SendMessageBatchResponseBody.class);
return;
}