in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/BatchSendMessageV2Processor.java [79:256]
public void processRequest(ChannelHandlerContext ctx, AsyncContext<HttpCommand> asyncContext)
throws Exception {
final HttpCommand request = asyncContext.getRequest();
final Integer requestCode = Integer.valueOf(request.getRequestCode());
CMD_LOGGER.info("cmd={}|{}|client2eventMesh|from={}|to={}",
RequestCode.get(requestCode),
EventMeshConstants.PROTOCOL_HTTP,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
SendMessageBatchV2RequestHeader sendMessageBatchV2RequestHeader =
(SendMessageBatchV2RequestHeader) request.getHeader();
String protocolType = sendMessageBatchV2RequestHeader.getProtocolType();
ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor =
ProtocolPluginFactory.getProtocolAdaptor(protocolType);
CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(request);
EventMeshHTTPConfiguration httpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
SendMessageBatchV2ResponseHeader sendMessageBatchV2ResponseHeader =
SendMessageBatchV2ResponseHeader.buildHeader(
requestCode,
httpConfiguration.getEventMeshCluster(),
httpConfiguration.getEventMeshEnv(),
httpConfiguration.getEventMeshIDC());
// todo: use validate processor to check
// validate event
if (!ObjectUtils.allNotNull(event.getSource(), event.getSpecVersion())
|| StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchV2ResponseBody.class);
return;
}
String idc = getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey());
String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());
// validate event-extension
if (StringUtils.isAnyBlank(idc, pid, sys)
|| !StringUtils.isNumeric(pid)) {
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageBatchV2ResponseBody.class);
return;
}
String bizNo = getExtension(event, SendMessageBatchV2RequestBody.BIZSEQNO);
String producerGroup = getExtension(event, SendMessageBatchV2RequestBody.PRODUCERGROUP);
String topic = event.getSubject();
if (StringUtils.isAnyBlank(bizNo, topic, producerGroup)
|| event.getData() == null) {
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageBatchV2ResponseBody.class);
return;
}
String content = new String(Objects.requireNonNull(event.getData()).toBytes(), Constants.DEFAULT_CHARSET);
if (content.length() > httpConfiguration.getEventMeshEventSize()) {
BATCH_MESSAGE_LOGGER.error("Event size exceeds the limit: {}", httpConfiguration.getEventMeshEventSize());
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
"Event size exceeds the limit: " + httpConfiguration.getEventMeshEventSize(),
SendMessageBatchV2ResponseBody.class);
return;
}
// do acl check
if (httpConfiguration.isEventMeshServerSecurityEnable()) {
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
String user = getExtension(event, ProtocolKey.ClientInstanceKey.USERNAME.getKey());
String pass = getExtension(event, ProtocolKey.ClientInstanceKey.PASSWD.getKey());
String subsystem = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());
try {
this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, subsystem, topic, requestCode);
} catch (Exception e) {
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageBatchV2ResponseBody.class);
ACL_LOGGER.warn("CLIENT HAS NO PERMISSION,BatchSendMessageV2Processor send failed", e);
return;
}
}
HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
if (!eventMeshHTTPServer.getBatchRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
summaryMetrics.recordSendBatchMsgDiscard(1);
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
EventMeshRetCode.EVENTMESH_BATCH_SPEED_OVER_LIMIT_ERR, null, SendMessageBatchV2ResponseBody.class);
return;
}
EventMeshProducer batchEventMeshProducer =
eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
batchEventMeshProducer.getMqProducerWrapper().getMeshMQProducer().setExtFields();
if (!batchEventMeshProducer.isStarted()) {
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
EventMeshRetCode.EVENTMESH_BATCH_PRODUCER_STOPED_ERR, null, SendMessageBatchV2ResponseBody.class);
return;
}
long batchStartTime = System.currentTimeMillis();
String defaultTTL = String.valueOf(EventMeshConstants.DEFAULT_MSG_TTL_MILLS);
// todo: use hashmap to avoid copy
String ttlValue = getExtension(event, SendMessageRequestBody.TTL);
if (StringUtils.isBlank(ttlValue) && !StringUtils.isNumeric(ttlValue)) {
event = CloudEventBuilder.from(event).withExtension(SendMessageRequestBody.TTL, defaultTTL)
.build();
}
try {
event = CloudEventBuilder.from(event)
.withExtension("msgtype", "persistent")
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.build();
BATCH_MESSAGE_LOGGER.debug("msg2MQMsg suc, topic:{}, msg:{}", topic, event.getData());
} catch (Exception e) {
BATCH_MESSAGE_LOGGER.error("msg2MQMsg err, topic:{}, msg:{}", topic, event.getData(), e);
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader, EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR,
EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR.getErrMsg()
+
EventMeshUtil.stackTrace(e, 2),
SendMessageBatchV2ResponseBody.class);
return;
}
summaryMetrics.recordSendBatchMsg(1);
final SendMessageContext sendMessageContext =
new SendMessageContext(bizNo, event, batchEventMeshProducer, eventMeshHTTPServer);
try {
batchEventMeshProducer.send(sendMessageContext, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
long batchEndTime = System.currentTimeMillis();
summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
BATCH_MESSAGE_LOGGER.debug(
"batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
bizNo, batchEndTime - batchStartTime, topic);
}
@Override
public void onException(OnExceptionContext context) {
long batchEndTime = System.currentTimeMillis();
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
BATCH_MESSAGE_LOGGER.error(
"batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
bizNo, batchEndTime - batchStartTime, topic, context.getException());
}
});
} catch (Exception e) {
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader, EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR,
EventMeshRetCode.EVENTMESH_SEND_BATCHLOG_MSG_ERR.getErrMsg()
+
EventMeshUtil.stackTrace(e, 2),
SendMessageBatchV2ResponseBody.class);
long batchEndTime = System.currentTimeMillis();
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
summaryMetrics.recordBatchSendMsgCost(batchEndTime - batchStartTime);
BATCH_MESSAGE_LOGGER.error(
"batchMessageV2|eventMesh2mq|REQ|ASYNC|bizSeqNo={}|send2MQCost={}ms|topic={}",
bizNo, batchEndTime - batchStartTime, topic, e);
}
completeResponse(request, asyncContext, sendMessageBatchV2ResponseHeader,
EventMeshRetCode.SUCCESS, null, SendMessageBatchV2ResponseBody.class);
}