in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/SubscribeProcessor.java [64:185]
public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext)
throws Exception {
HttpCommand responseEventMeshCommand;
final HttpCommand request = asyncContext.getRequest();
final Integer requestCode = Integer.valueOf(request.getRequestCode());
final String localAddress = IPUtils.getLocalAddress();
final String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("cmd={}|{}|client2eventMesh|from={}|to={}",
RequestCode.get(requestCode), EventMeshConstants.PROTOCOL_HTTP, remoteAddr, localAddress);
final SubscribeRequestHeader subscribeRequestHeader = (SubscribeRequestHeader) request.getHeader();
final SubscribeRequestBody subscribeRequestBody = (SubscribeRequestBody) request.getBody();
EventMeshHTTPConfiguration eventMeshHttpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
final SubscribeResponseHeader subscribeResponseHeader =
SubscribeResponseHeader
.buildHeader(requestCode,
eventMeshHttpConfiguration.getEventMeshCluster(),
localAddress,
eventMeshHttpConfiguration.getEventMeshEnv(),
eventMeshHttpConfiguration.getEventMeshIDC());
// validate header
if (StringUtils.isAnyBlank(subscribeRequestHeader.getIdc(),
subscribeRequestHeader.getPid(), subscribeRequestHeader.getSys())
|| !StringUtils.isNumeric(subscribeRequestHeader.getPid())) {
completeResponse(request, asyncContext, subscribeResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, SubscribeResponseBody.class);
return;
}
// validate body
if (StringUtils.isAnyBlank(subscribeRequestBody.getUrl(), subscribeRequestBody.getConsumerGroup())
|| CollectionUtils.isEmpty(subscribeRequestBody.getTopics())) {
completeResponse(request, asyncContext, subscribeResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, SubscribeResponseBody.class);
return;
}
final List<SubscriptionItem> subTopicList = subscribeRequestBody.getTopics();
// do acl check
if (eventMeshHttpConfiguration.isEventMeshServerSecurityEnable()) {
for (final SubscriptionItem item : subTopicList) {
try {
this.acl.doAclCheckInHttpReceive(remoteAddr,
subscribeRequestHeader.getUsername(),
subscribeRequestHeader.getPasswd(),
subscribeRequestHeader.getSys(), item.getTopic(),
requestCode);
} catch (Exception e) {
completeResponse(request, asyncContext, subscribeResponseHeader,
EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), SubscribeResponseBody.class);
log.warn("CLIENT HAS NO PERMISSION,SubscribeProcessor subscribe failed", e);
return;
}
}
}
final String url = subscribeRequestBody.getUrl();
final String consumerGroup = subscribeRequestBody.getConsumerGroup();
// validate URL
try {
if (!IPUtils.isValidDomainOrIp(url, eventMeshHttpConfiguration.getEventMeshIpv4BlackList(),
eventMeshHttpConfiguration.getEventMeshIpv6BlackList())) {
log.error("subscriber url {} is not valid", url);
completeResponse(request, asyncContext, subscribeResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url,
SubscribeResponseBody.class);
return;
}
} catch (Exception e) {
log.error("subscriber url:{} is invalid.", url, e);
completeResponse(request, asyncContext, subscribeResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR.getErrMsg() + " invalid URL: " + url,
SubscribeResponseBody.class);
return;
}
SubscriptionManager subscriptionManager = eventMeshHTTPServer.getSubscriptionManager();
synchronized (subscriptionManager.getLocalClientInfoMapping()) {
ClientInfo clientInfo = getClientInfo(subscribeRequestHeader);
subscriptionManager.registerClient(clientInfo, consumerGroup, subTopicList, url);
subscriptionManager.updateSubscription(clientInfo, consumerGroup, url, subTopicList);
final long startTime = System.currentTimeMillis();
try {
// subscription relationship change notification
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
subscriptionManager.getLocalConsumerGroupMapping().get(consumerGroup));
final CompleteHandler<HttpCommand> handler = httpCommand -> {
try {
log.debug("{}", httpCommand);
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHTTPReqResTimeCost(
System.currentTimeMillis() - request.getReqTime());
} catch (Exception ex) {
log.error("onResponse error", ex);
}
};
responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
completeResponse(request, asyncContext, subscribeResponseHeader,
EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR,
EventMeshRetCode.EVENTMESH_SUBSCRIBE_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2),
SubscribeResponseBody.class);
final long endTime = System.currentTimeMillis();
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms|topic={}|bizSeqNo={}|uniqueId={}",
endTime - startTime, JsonUtils.toJSONString(subscribeRequestBody.getTopics()), subscribeRequestBody.getUrl(), e);
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsgFailed();
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsgCost(endTime - startTime);
}
eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
}
}