public void executor()

in shenyu-admin/src/main/java/org/apache/shenyu/admin/disruptor/subscriber/URIRegisterExecutorSubscriber.java [60:109]


    public void executor(final Collection<URIRegisterDTO> dataList) {
        if (CollectionUtils.isEmpty(dataList)) {
            return;
        }
        final Map<String, List<URIRegisterDTO>> groupByRpcType = dataList.stream()
                .filter(data -> StringUtils.isNotBlank(data.getRpcType()))
                .collect(Collectors.groupingBy(URIRegisterDTO::getRpcType));
        for (Map.Entry<String, List<URIRegisterDTO>> entry : groupByRpcType.entrySet()) {
            final String rpcType = entry.getKey();
            Optional.ofNullable(shenyuClientRegisterService.get(rpcType))
                    .ifPresent(service -> {
                        final List<URIRegisterDTO> list = entry.getValue();
                        Map<String, List<URIRegisterDTO>> listMap = buildData(list);
                        listMap.forEach((selectorName, uriList) -> {
                            final List<URIRegisterDTO> register = new LinkedList<>();
                            final List<URIRegisterDTO> heartbeat = new LinkedList<>();
                            final List<URIRegisterDTO> offline = new LinkedList<>();
                            for (URIRegisterDTO d : uriList) {
                                final EventType eventType = d.getEventType();
                                if (Objects.isNull(eventType) || EventType.REGISTER.equals(eventType)) {
                                    // eventType is null, should be old versions
                                    register.add(d);
                                } else if (EventType.OFFLINE.equals(eventType)) {
                                    offline.add(d);
                                } else if (EventType.HEARTBEAT.equals(eventType)) {
                                    heartbeat.add(d);
                                }
                            }
                            if (CollectionUtils.isNotEmpty(register)) {
                                register.stream().map(URIRegisterDTO::getNamespaceId)
                                        .filter(StringUtils::isNotBlank)
                                        .findFirst()
                                        .ifPresent(namespaceId -> service.registerURI(selectorName, register, namespaceId));
                            }
                            if (CollectionUtils.isNotEmpty(heartbeat)) {
                                heartbeat.stream().map(URIRegisterDTO::getNamespaceId)
                                        .filter(StringUtils::isNotBlank)
                                        .findFirst()
                                        .ifPresent(namespaceId -> service.heartbeat(selectorName, heartbeat, namespaceId));
                            }
                            if (CollectionUtils.isNotEmpty(offline)) {
                                offline.stream().map(URIRegisterDTO::getNamespaceId)
                                        .filter(StringUtils::isNotBlank)
                                        .findFirst()
                                        .ifPresent(namespaceId -> service.offline(selectorName, offline, namespaceId));
                            }
                        });
                    });
        }
    }