in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/HeartBeatProcessor.java [66:183]
public void processRequest(final ChannelHandlerContext ctx, final AsyncContext<HttpCommand> asyncContext) throws Exception {
HttpCommand responseEventMeshCommand;
final String localAddress = IPUtils.getLocalAddress();
HttpCommand request = asyncContext.getRequest();
log.info("cmd={}|{}|client2eventMesh|from={}|to={}", RequestCode.get(Integer.valueOf(request.getRequestCode())),
EventMeshConstants.PROTOCOL_HTTP, RemotingHelper.parseChannelRemoteAddr(ctx.channel()), localAddress);
final HeartbeatRequestHeader heartbeatRequestHeader = (HeartbeatRequestHeader) request.getHeader();
final HeartbeatRequestBody heartbeatRequestBody = (HeartbeatRequestBody) request.getBody();
EventMeshHTTPConfiguration httpConfiguration = eventMeshHTTPServer.getEventMeshHttpConfiguration();
final HeartbeatResponseHeader heartbeatResponseHeader =
HeartbeatResponseHeader.buildHeader(Integer.valueOf(request.getRequestCode()),
httpConfiguration.getEventMeshCluster(),
localAddress, httpConfiguration.getEventMeshEnv(),
httpConfiguration.getEventMeshIDC());
// validate header
if (StringUtils.isAnyBlank(
heartbeatRequestHeader.getIdc(), heartbeatRequestHeader.getPid(), heartbeatRequestHeader.getSys())
|| !StringUtils.isNumeric(heartbeatRequestHeader.getPid())) {
completeResponse(request, asyncContext, heartbeatResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, null, HeartbeatResponseBody.class);
return;
}
// validate body
if (StringUtils.isAnyBlank(heartbeatRequestBody.getClientType(), heartbeatRequestBody.getConsumerGroup())
|| CollectionUtils.isEmpty(heartbeatRequestBody.getHeartbeatEntities())) {
completeResponse(request, asyncContext, heartbeatResponseHeader,
EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, null, HeartbeatResponseBody.class);
return;
}
final ConcurrentHashMap<String, List<Client>> tmpMap = new ConcurrentHashMap<>();
final List<HeartbeatRequestBody.HeartbeatEntity> heartbeatEntities = heartbeatRequestBody.getHeartbeatEntities();
for (final HeartbeatRequestBody.HeartbeatEntity heartbeatEntity : heartbeatEntities) {
final Client client = new Client();
client.setEnv(heartbeatRequestHeader.getEnv());
client.setIdc(heartbeatRequestHeader.getIdc());
client.setSys(heartbeatRequestHeader.getSys());
client.setIp(heartbeatRequestHeader.getIp());
client.setPid(heartbeatRequestHeader.getPid());
client.setConsumerGroup(heartbeatRequestBody.getConsumerGroup());
client.setTopic(heartbeatEntity.getTopic());
client.setUrl(heartbeatEntity.getUrl());
client.setLastUpTime(new Date());
if (StringUtils.isAnyBlank(client.getTopic(), client.getUrl())) {
continue;
}
// do acl check
if (eventMeshHTTPServer.getEventMeshHttpConfiguration().isEventMeshServerSecurityEnable()) {
try {
this.acl.doAclCheckInHttpHeartbeat(
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
heartbeatRequestHeader.getUsername(),
heartbeatRequestHeader.getPasswd(),
heartbeatRequestHeader.getSys(),
client.getTopic(),
Integer.parseInt(heartbeatRequestHeader.getCode()));
} catch (Exception e) {
completeResponse(request, asyncContext, heartbeatResponseHeader,
EventMeshRetCode.EVENTMESH_ACL_ERR, e.getMessage(), HeartbeatResponseBody.class);
log.warn("CLIENT HAS NO PERMISSION,HeartBeatProcessor subscribe failed", e);
return;
}
}
final String groupTopicKey = client.getConsumerGroup() + "@" + client.getTopic();
List<Client> clients = tmpMap.computeIfAbsent(groupTopicKey, k -> new ArrayList<>());
clients.add(client);
}
ConcurrentHashMap<String, List<Client>> clientInfoMap =
eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping();
synchronized (clientInfoMap) {
for (final Map.Entry<String, List<Client>> groupTopicClientMapping : tmpMap.entrySet()) {
final List<Client> localClientList = clientInfoMap.get(groupTopicClientMapping.getKey());
if (CollectionUtils.isEmpty(localClientList)) {
clientInfoMap.put(groupTopicClientMapping.getKey(), groupTopicClientMapping.getValue());
} else {
final List<Client> tmpClientList = groupTopicClientMapping.getValue();
supplyClientInfoList(tmpClientList, localClientList);
clientInfoMap.put(groupTopicClientMapping.getKey(), localClientList);
}
}
}
final long startTime = System.currentTimeMillis();
try {
final CompleteHandler<HttpCommand> handler = httpCommand -> {
try {
log.debug("{}", httpCommand);
eventMeshHTTPServer.sendResponse(ctx, httpCommand.httpResponse());
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordHTTPReqResTimeCost(
System.currentTimeMillis() - asyncContext.getRequest().getReqTime());
} catch (Exception ex) {
// ignore
}
};
responseEventMeshCommand = request.createHttpCommandResponse(EventMeshRetCode.SUCCESS);
asyncContext.onComplete(responseEventMeshCommand, handler);
} catch (Exception e) {
completeResponse(request, asyncContext, heartbeatResponseHeader,
EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR,
EventMeshRetCode.EVENTMESH_HEARTBEAT_ERR.getErrMsg() + EventMeshUtil.stackTrace(e, 2),
HeartbeatResponseBody.class);
final long elapsedTime = System.currentTimeMillis() - startTime;
log.error("message|eventMesh2mq|REQ|ASYNC|heartBeatMessageCost={}ms", elapsedTime, e);
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsgFailed();
eventMeshHTTPServer.getEventMeshHttpMetricsManager().getHttpMetrics().recordSendMsgCost(elapsedTime);
}
}