in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SendSyncMessageProcessor.java [73:286]
public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext)
throws Exception {
HttpCommand request = asyncContext.getRequest();
final String localAddress = IPUtils.getLocalAddress();
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
RequestCode.get(Integer.valueOf(request.getRequestCode())), EventMeshConstants.PROTOCOL_HTTP, remoteAddr, localAddress);
SendMessageRequestHeader sendMessageRequestHeader = (SendMessageRequestHeader) request.getHeader();
String protocolType = sendMessageRequestHeader.getProtocolType();
final ProtocolAdaptor<ProtocolTransportObject> httpCommandProtocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(protocolType);
final CloudEvent event = httpCommandProtocolAdaptor.toCloudEvent(asyncContext.getRequest());
EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
final SendMessageResponseHeader sendMessageResponseHeader =
SendMessageResponseHeader
.buildHeader(Integer.valueOf(request.getRequestCode()),
eventMeshHttpConfiguration.getEventMeshCluster(),
localAddress,
eventMeshHttpConfiguration.getEventMeshEnv(),
eventMeshHttpConfiguration.getEventMeshIDC());
// validate event
if (!ObjectUtils.allNotNull(event, event.getSource(), event.getSpecVersion())
|| StringUtils.isAnyBlank(event.getId(), event.getType(), event.getSubject())) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageResponseBody.class);
return;
}
final String idc = getExtension(event, ProtocolKey.ClientInstanceKey.IDC.getKey());
final String pid = getExtension(event, ProtocolKey.ClientInstanceKey.PID.getKey());
final String sys = getExtension(event, ProtocolKey.ClientInstanceKey.SYS.getKey());
// validate event-extension
if (StringUtils.isAnyBlank(idc, pid, sys) || !StringUtils.isNumeric(pid)) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SendMessageResponseBody.class);
return;
}
final String bizNo = getExtension(event, SendMessageRequestBody.BIZSEQNO);
final String uniqueId = getExtension(event, SendMessageRequestBody.UNIQUEID);
final String producerGroup = getExtension(event, SendMessageRequestBody.PRODUCERGROUP);
final String topic = event.getSubject();
final String ttl = getExtension(event, SendMessageRequestBody.TTL);
// validate body
if (StringUtils.isAnyBlank(bizNo, uniqueId, producerGroup, topic, ttl) || event.getData() == null) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SendMessageResponseBody.class);
return;
}
// do acl check
if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
final String user = getExtension(event, ProtocolKey.ClientInstanceKey.USERNAME.getKey());
final String pass = getExtension(event, ProtocolKey.ClientInstanceKey.PASSWD.getKey());
final int requestCode = Integer.parseInt(request.getRequestCode());
try {
this.acl.doAclCheckInHttpSend(remoteAddr, user, pass, sys, topic, requestCode);
} catch (Exception e) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SendMessageResponseBody.class);
log.warn("CLIENT HAS NO PERMISSION,SendSyncMessageProcessor send failed", e);
return;
}
}
final HttpMetrics summaryMetrics = eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics();
// control flow rate limit
if (!eventMeshHTTPServer.getMsgRateLimiter()
.tryAcquire(EventMeshConstants.DEFAULT_FASTFAIL_TIMEOUT_IN_MILLISECONDS, TimeUnit.MILLISECONDS)) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_HTTP_MES_SEND_OVER_LIMIT_ERR, null, SendMessageResponseBody.class);
summaryMetrics.recordHTTPDiscard();
return;
}
final String content = new String(Objects.requireNonNull(event.getData()).toBytes(), Constants.DEFAULT_CHARSET);
int eventMeshEventSize = eventMeshHttpConfiguration.getEventMeshEventSize();
if (content.length() > eventMeshEventSize) {
log.error("Event size exceeds the limit: {}", eventMeshEventSize);
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
"Event size exceeds the limit: " + eventMeshEventSize,
SendMessageResponseBody.class);
return;
}
final EventMeshProducer eventMeshProducer =
eventMeshHTTPServer.getProducerManager().getEventMeshProducer(producerGroup);
if (!eventMeshProducer.isStarted()) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_GROUP_PRODUCER_STOPED_ERR, null, SendMessageResponseBody.class);
return;
}
CloudEvent newEvent;
try {
newEvent = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.MSG_TYPE, EventMeshConstants.PERSISTENT)
.withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(System.currentTimeMillis()))
.build();
log.debug("msg2MQMsg suc, bizSeqNo={}, topic={}", bizNo, topic);
} catch (Exception e) {
log.error("msg2MQMsg err, bizSeqNo={}, topic={}", bizNo, topic, e);
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_PACKAGE_MSG_ERR, null, SendMessageResponseBody.class);
return;
}
final SendMessageContext sendMessageContext =
new SendMessageContext(bizNo, newEvent, eventMeshProducer, eventMeshHTTPServer);
summaryMetrics.recordSendMsg();
final long startTime = System.currentTimeMillis();
final CompleteHandler<HttpCommand> handler = httpCommand -> {
try {
log.debug("{}", httpCommand);
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHTTPReqResTimeCost(
System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
} catch (Exception ex) {
log.error("onResponse error", ex);
// ignore
}
};
try {
eventMeshProducer.request(sendMessageContext, new RequestReplyCallback() {
@Override
public void onSuccess(final CloudEvent event) {
log.info("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+ "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime, topic, bizNo, uniqueId);
try {
final CloudEvent newEvent = CloudEventBuilder.from(event)
.withExtension(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP,
String.valueOf(System.currentTimeMillis()))
.build();
final String rtnMsg = new String(Objects.requireNonNull(newEvent.getData()).toBytes(),
Constants.DEFAULT_CHARSET);
final HttpCommand succ = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(EventMeshRetCode.SUCCESS.getRetCode(),
JsonUtils.toJSONString(SendMessageResponseBody.ReplyMessage.builder()
.topic(topic)
.body(rtnMsg)
.properties(EventMeshUtil.getEventProp(newEvent))
.build())));
asyncContext.onComplete(succ, handler);
} catch (Exception ex) {
final HttpCommand err = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody.buildBody(
EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(ex, 2)));
asyncContext.onComplete(err, handler);
log.warn("message|mq2eventMesh|RSP", ex);
}
}
@Override
public void onException(final Throwable e) {
final HttpCommand err = request.createHttpCommandResponse(
sendMessageResponseHeader,
SendMessageResponseBody
.buildBody(EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getRetCode(),
EventMeshRetCode.EVENTMESH_WAITING_RR_MSG_ERR.getErrMsg()
+ EventMeshUtil.stackTrace(e, 2)));
asyncContext.onComplete(err, handler);
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
log.error("message|mq2eventMesh|RSP|SYNC|rrCost={}ms|topic={}"
+ "|bizSeqNo={}|uniqueId={}", System.currentTimeMillis() - startTime, topic, bizNo, uniqueId, e);
}
}, Integer.parseInt(ttl));
} catch (Exception ex) {
completeResponse(request, asyncContext, sendMessageResponseHeader,
EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR,
EventMeshRetCode.EVENTMESH_SEND_SYNC_MSG_ERR.getErrMsg() + EventMeshUtil.stackTrace(ex, 2),
SendMessageResponseBody.class);
eventMeshHTTPServer.getHttpRetryer().newTimeout(sendMessageContext, 10, TimeUnit.SECONDS);
final long endTime = System.currentTimeMillis();
summaryMetrics.recordSendMsgFailed();
summaryMetrics.recordSendMsgCost(endTime - startTime);
log.error("message|eventMesh2mq|REQ|SYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, topic, bizNo, uniqueId, ex);
}
return;
}