in shenyu-admin/src/main/java/org/apache/shenyu/admin/disruptor/subscriber/URIRegisterExecutorSubscriber.java [60:109]
public void executor(final Collection<URIRegisterDTO> dataList) {
if (CollectionUtils.isEmpty(dataList)) {
return;
}
final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()
.filter(data -> StringUtils.isNotBlank(data.getRpcType()))
.collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));
for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {
final String rpcType = entry.getKey();
Optional.ofNullable(shenyuClientRegisterService.get(rpcType))
.ifPresent(service -> {
final List<URIRegisterDTO> list = entry.getValue();
Map<String, List<URIRegisterDTO>> listMap = buildData(list);
listMap.forEach((selectorName, uriList) -> {
final List<URIRegisterDTO> register = new LinkedList<>();
final List<URIRegisterDTO> heartbeat = new LinkedList<>();
final List<URIRegisterDTO> offline = new LinkedList<>();
for (URIRegisterDTO d : uriList) {
final EventType eventType = d.getEventType();
if (Objects.isNull(eventType) || EventType.REGISTER.equals(eventType)) {
// eventType is null, should be old versions
register.add(d);
} else if (EventType.OFFLINE.equals(eventType)) {
offline.add(d);
} else if (EventType.HEARTBEAT.equals(eventType)) {
heartbeat.add(d);
}
}
if (CollectionUtils.isNotEmpty(register)) {
register.stream().map(URIRegisterDTO::getNamespaceId)
.filter(StringUtils::isNotBlank)
.findFirst()
.ifPresent(namespaceId -> service.registerURI(selectorName, register, namespaceId));
}
if (CollectionUtils.isNotEmpty(heartbeat)) {
heartbeat.stream().map(URIRegisterDTO::getNamespaceId)
.filter(StringUtils::isNotBlank)
.findFirst()
.ifPresent(namespaceId -> service.heartbeat(selectorName, heartbeat, namespaceId));
}
if (CollectionUtils.isNotEmpty(offline)) {
offline.stream().map(URIRegisterDTO::getNamespaceId)
.filter(StringUtils::isNotBlank)
.findFirst()
.ifPresent(namespaceId -> service.offline(selectorName, offline, namespaceId));
}
});
});
}
}