public synchronized void process()

in dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java [161:252]


    public synchronized void process(ConfigChangedEvent event) {
        String rawRule = event.getContent();
        if (StringUtils.isEmpty(rawRule)) {
            // fail back to startup status
            rawRule = INIT;
            // logger.warn(COMMON_PROPERTY_TYPE_MISMATCH, "", "", "Received empty migration rule, will ignore.");
        }
        try {
            ruleQueue.put(rawRule);
        } catch (InterruptedException e) {
            logger.error(
                    COMMON_THREAD_INTERRUPTED_EXCEPTION,
                    "",
                    "",
                    "Put rawRule to rule management queue failed. rawRule: " + rawRule,
                    e);
        }

        if (executorSubmit.compareAndSet(false, true)) {
            ruleMigrationFuture = ruleManageExecutor.submit(() -> {
                while (true) {
                    String rule = "";
                    try {
                        rule = ruleQueue.take();
                        if (StringUtils.isEmpty(rule)) {
                            Thread.sleep(1000);
                        }
                    } catch (InterruptedException e) {
                        logger.error(
                                COMMON_THREAD_INTERRUPTED_EXCEPTION, "", "", "Poll Rule from config center failed.", e);
                    }
                    if (StringUtils.isEmpty(rule)) {
                        continue;
                    }
                    if (Objects.equals(this.rawRule, rule)) {
                        logger.info("Ignore duplicated rule");
                        continue;
                    }

                    logger.info("Using the following migration rule to migrate:");
                    logger.info(rule);

                    setRawRule(rule);

                    if (CollectionUtils.isEmptyMap(handlers)) {
                        continue;
                    }

                    ExecutorService executorService = null;
                    try {
                        executorService = Executors.newFixedThreadPool(
                                Math.min(handlers.size(), 100), new NamedThreadFactory("Dubbo-Invoker-Migrate"));
                        List<Future<?>> migrationFutures = new ArrayList<>(handlers.size());
                        for (MigrationRuleHandler<?> handler : handlers.values()) {
                            Future<?> future = executorService.submit(() -> handler.doMigrate(this.rule));
                            migrationFutures.add(future);
                        }

                        for (Future<?> future : migrationFutures) {
                            try {
                                future.get();
                            } catch (InterruptedException ie) {
                                logger.warn(
                                        INTERNAL_ERROR,
                                        "unknown error in registry module",
                                        "",
                                        "Interrupted while waiting for migration async task to finish.");
                            } catch (ExecutionException ee) {
                                logger.error(
                                        INTERNAL_ERROR,
                                        "unknown error in registry module",
                                        "",
                                        "Migration async task failed.",
                                        ee.getCause());
                            }
                        }
                    } catch (Throwable t) {
                        logger.error(
                                INTERNAL_ERROR,
                                "unknown error in registry module",
                                "",
                                "Error occurred when migration.",
                                t);
                    } finally {
                        if (executorService != null) {
                            executorService.shutdown();
                        }
                    }
                }
            });
        }
    }