in dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/migration/MigrationRuleListener.java [161:252]
public synchronized void process(ConfigChangedEvent event) {
String rawRule = event.getContent();
if (StringUtils.isEmpty(rawRule)) {
// fail back to startup status
rawRule = INIT;
// logger.warn(COMMON_PROPERTY_TYPE_MISMATCH, "", "", "Received empty migration rule, will ignore.");
}
try {
ruleQueue.put(rawRule);
} catch (InterruptedException e) {
logger.error(
COMMON_THREAD_INTERRUPTED_EXCEPTION,
"",
"",
"Put rawRule to rule management queue failed. rawRule: " + rawRule,
e);
}
if (executorSubmit.compareAndSet(false, true)) {
ruleMigrationFuture = ruleManageExecutor.submit(() -> {
while (true) {
String rule = "";
try {
rule = ruleQueue.take();
if (StringUtils.isEmpty(rule)) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
logger.error(
COMMON_THREAD_INTERRUPTED_EXCEPTION, "", "", "Poll Rule from config center failed.", e);
}
if (StringUtils.isEmpty(rule)) {
continue;
}
if (Objects.equals(this.rawRule, rule)) {
logger.info("Ignore duplicated rule");
continue;
}
logger.info("Using the following migration rule to migrate:");
logger.info(rule);
setRawRule(rule);
if (CollectionUtils.isEmptyMap(handlers)) {
continue;
}
ExecutorService executorService = null;
try {
executorService = Executors.newFixedThreadPool(
Math.min(handlers.size(), 100), new NamedThreadFactory("Dubbo-Invoker-Migrate"));
List<Future<?>> migrationFutures = new ArrayList<>(handlers.size());
for (MigrationRuleHandler<?> handler : handlers.values()) {
Future<?> future = executorService.submit(() -> handler.doMigrate(this.rule));
migrationFutures.add(future);
}
for (Future<?> future : migrationFutures) {
try {
future.get();
} catch (InterruptedException ie) {
logger.warn(
INTERNAL_ERROR,
"unknown error in registry module",
"",
"Interrupted while waiting for migration async task to finish.");
} catch (ExecutionException ee) {
logger.error(
INTERNAL_ERROR,
"unknown error in registry module",
"",
"Migration async task failed.",
ee.getCause());
}
}
} catch (Throwable t) {
logger.error(
INTERNAL_ERROR,
"unknown error in registry module",
"",
"Error occurred when migration.",
t);
} finally {
if (executorService != null) {
executorService.shutdown();
}
}
}
});
}
}