in eventmesh-runtime/src/main/java/org/apache/eventmesh/runtime/core/protocol/http/processor/LocalUnSubscribeEventProcessor.java [70:229]
public void handler(final HandlerService.HandlerSpecific handlerSpecific, final HttpRequest httpRequest) throws Exception {
final AsyncContext<HttpEventWrapper> asyncContext = handlerSpecific.getAsyncContext();
final ChannelHandlerContext ctx = handlerSpecific.getCtx();
final HttpEventWrapper requestWrapper = asyncContext.getRequest();
String localAddress = IPUtils.getLocalAddress();
String remoteAddr = RemotingHelper.parseChannelRemoteAddr(ctx.channel());
log.info("uri={}|{}|client2eventMesh|from={}|to={}",
requestWrapper.getRequestURI(), EventMeshConstants.PROTOCOL_HTTP, remoteAddr, localAddress);
// user request header
requestWrapper.getHeaderMap().put(ProtocolKey.ClientInstanceKey.IP.getKey(), remoteAddr);
// build sys header
requestWrapper.buildSysHeaderForClient();
final Map<String, Object> responseHeaderMap = builderResponseHeaderMap(requestWrapper);
final Map<String, Object> sysHeaderMap = requestWrapper.getSysHeaderMap();
final Map<String, Object> responseBodyMap = new HashMap<>();
// validate header
if (validateSysHeader(sysHeaderMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_HEADER_ERR, responseHeaderMap,
responseBodyMap, null);
return;
}
// validate body
final byte[] requestBody = requestWrapper.getBody();
final Map<String, Object> requestBodyMap = Optional.ofNullable(JsonUtils.parseTypeReferenceObject(
new String(requestBody, Constants.DEFAULT_CHARSET),
new TypeReference<HashMap<String, Object>>() {
})).orElseGet(Maps::newHashMap);
if (validatedRequestBodyMap(requestBodyMap)) {
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_PROTOCOL_BODY_ERR, responseHeaderMap,
responseBodyMap, null);
return;
}
final String unSubscribeUrl = requestBodyMap.get(EventMeshConstants.URL).toString();
final String consumerGroup = requestBodyMap.get(EventMeshConstants.CONSUMER_GROUP).toString();
// unSubscriptionItem
final List<String> unSubTopicList = Optional.ofNullable(JsonUtils.parseTypeReferenceObject(
JsonUtils.toJSONString(requestBodyMap.get(EventMeshConstants.MANAGE_TOPIC)),
new TypeReference<List<String>>() {
})).orElseGet(Collections::emptyList);
final String pid = sysHeaderMap.get(ProtocolKey.ClientInstanceKey.PID.getKey()).toString();
synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()) {
boolean isChange = true;
registerClient(requestWrapper, consumerGroup, unSubTopicList, unSubscribeUrl);
for (final String unSubTopic : unSubTopicList) {
final List<Client> groupTopicClients = eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping()
.get(consumerGroup + "@" + unSubTopic);
final Iterator<Client> clientIterator = groupTopicClients.iterator();
while (clientIterator.hasNext()) {
final Client client = clientIterator.next();
if (StringUtils.equals(client.getPid(), pid)
&& StringUtils.equals(client.getUrl(), unSubscribeUrl)) {
log.warn("client {} start unsubscribe", JsonUtils.toJSONString(client));
clientIterator.remove();
}
}
if (CollectionUtils.isNotEmpty(groupTopicClients)) {
// change url
final Map<String, List<String>> idcUrls = new HashMap<>();
final Set<String> clientUrls = new HashSet<>();
for (final Client client : groupTopicClients) {
// remove subscribed url
if (!StringUtils.equals(unSubscribeUrl, client.getUrl())) {
clientUrls.add(client.getUrl());
List<String> urls = idcUrls.computeIfAbsent(client.getIdc(), list -> new ArrayList<>());
urls.add(StringUtils.deleteWhitespace(client.getUrl()));
}
}
synchronized (eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping()) {
final ConsumerGroupConf consumerGroupConf =
eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup);
final Map<String, ConsumerGroupTopicConf> map =
consumerGroupConf.getConsumerGroupTopicConf();
for (final Map.Entry<String, ConsumerGroupTopicConf> entry : map.entrySet()) {
// only modify the topic to subscribe
if (StringUtils.equals(unSubTopic, entry.getKey())) {
final ConsumerGroupTopicConf latestTopicConf = new ConsumerGroupTopicConf();
latestTopicConf.setConsumerGroup(consumerGroup);
latestTopicConf.setTopic(unSubTopic);
latestTopicConf.setSubscriptionItem(entry.getValue().getSubscriptionItem());
latestTopicConf.setUrls(clientUrls);
latestTopicConf.setIdcUrls(idcUrls);
map.put(unSubTopic, latestTopicConf);
}
}
eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().put(consumerGroup, consumerGroupConf);
}
} else {
isChange = false;
break;
}
}
final long startTime = System.currentTimeMillis();
if (isChange) {
try {
eventMeshHTTPServer.getConsumerManager().notifyConsumerManager(consumerGroup,
eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().get(consumerGroup));
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
} catch (Exception e) {
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+ "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap,
responseBodyMap, null);
}
} else {
// remove
try {
eventMeshHTTPServer.getConsumerManager()
.notifyConsumerManager(consumerGroup, null);
responseBodyMap.put(EventMeshConstants.RET_CODE, EventMeshRetCode.SUCCESS.getRetCode());
responseBodyMap.put(EventMeshConstants.RET_MSG, EventMeshRetCode.SUCCESS.getErrMsg());
handlerSpecific.sendResponse(responseHeaderMap, responseBodyMap);
// clean ClientInfo
eventMeshHTTPServer.getSubscriptionManager().getLocalClientInfoMapping().keySet()
.removeIf(s -> StringUtils.contains(s, consumerGroup));
// clean ConsumerGroupInfo
eventMeshHTTPServer.getSubscriptionManager().getLocalConsumerGroupMapping().keySet()
.removeIf(s -> StringUtils.equals(consumerGroup, s));
} catch (Exception e) {
log.error("message|eventMesh2mq|REQ|ASYNC|send2MQCost={}ms"
+ "|topic={}|url={}", System.currentTimeMillis() - startTime, JsonUtils.toJSONString(unSubTopicList), unSubscribeUrl, e);
handlerSpecific.sendErrorResponse(EventMeshRetCode.EVENTMESH_UNSUBSCRIBE_ERR, responseHeaderMap,
responseBodyMap, null);
}
}
// Update service metadata
eventMeshHTTPServer.getSubscriptionManager().updateMetaData();
}
}