in config/src/main/java/com/alibaba/nacos/config/server/service/ConfigMigrateService.java [1041:1142]
private void doCheckMigrate() throws Exception {
int migrateMulti = EnvUtil.getProperty("nacos.gray.migrate.executor.multi", Integer.class, Integer.valueOf(4));
ThreadPoolExecutor executorService = new ThreadPoolExecutor(ThreadUtils.getSuitableThreadCount(migrateMulti),
ThreadUtils.getSuitableThreadCount(migrateMulti), 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(PropertyUtil.getAllDumpPageSize() * migrateMulti),
r -> new Thread(r, "gray-migrate-worker"), new ThreadPoolExecutor.CallerRunsPolicy());
int pageSize = 100;
int rowCount = configInfoBetaPersistService.configInfoBetaCount();
int pageCount = (int) Math.ceil(rowCount * 1.0 / pageSize);
int actualRowCount = 0;
for (int pageNo = 1; pageNo <= pageCount; pageNo++) {
Page<ConfigInfoBetaWrapper> page = configInfoBetaPersistService.findAllConfigInfoBetaForDumpAll(pageNo,
pageSize);
if (page != null) {
for (ConfigInfoBetaWrapper cf : page.getPageItems()) {
executorService.execute(() -> {
GRAY_MIGRATE_FLAG.set(true);
ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(), BetaGrayRule.TYPE_BETA);
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate beta to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
BetaGrayRule.TYPE_BETA, BetaGrayRule.VERSION, cf.getBetaIps(),
BetaGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf, BetaGrayRule.TYPE_BETA,
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIp(), "nacos_auto_migrate");
GRAY_MIGRATE_FLAG.set(false);
}
});
}
actualRowCount += page.getPageItems().size();
DEFAULT_LOG.info("[gray-migrate-beta] submit gray task {} / {}", actualRowCount, rowCount);
}
}
try {
int unfinishedTaskCount = 0;
while ((unfinishedTaskCount = executorService.getQueue().size() + executorService.getActiveCount()) > 0) {
DEFAULT_LOG.info("[gray-migrate-beta] wait {} migrate tasks to be finished", unfinishedTaskCount);
Thread.sleep(1000L);
}
} catch (Exception e) {
DEFAULT_LOG.error("[gray-migrate-beta] wait dump tasks to be finished error", e);
throw e;
}
rowCount = configInfoTagPersistService.configInfoTagCount();
pageCount = (int) Math.ceil(rowCount * 1.0 / pageSize);
actualRowCount = 0;
for (int pageNo = 1; pageNo <= pageCount; pageNo++) {
Page<ConfigInfoTagWrapper> page = configInfoTagPersistService.findAllConfigInfoTagForDumpAll(pageNo,
pageSize);
if (page != null) {
for (ConfigInfoTagWrapper cf : page.getPageItems()) {
executorService.execute(() -> {
GRAY_MIGRATE_FLAG.set(true);
ConfigInfoGrayWrapper configInfo4Gray = configInfoGrayPersistService.findConfigInfo4Gray(
cf.getDataId(), cf.getGroup(), cf.getTenant(),
TagGrayRule.TYPE_TAG + "_" + cf.getTag());
if (configInfo4Gray == null || configInfo4Gray.getLastModified() < cf.getLastModified()) {
DEFAULT_LOG.info("[migrate tag to gray] dataId={}, group={}, tenant={}, md5={}",
cf.getDataId(), cf.getGroup(), cf.getTenant(), cf.getMd5());
ConfigGrayPersistInfo localConfigGrayPersistInfo = new ConfigGrayPersistInfo(
TagGrayRule.TYPE_TAG, TagGrayRule.VERSION, cf.getTag(), TagGrayRule.PRIORITY);
configInfoGrayPersistService.insertOrUpdateGray(cf,
TagGrayRule.TYPE_TAG + "_" + cf.getTag(),
GrayRuleManager.serializeConfigGrayPersistInfo(localConfigGrayPersistInfo),
NetUtils.localIp(), "nacos_auto_migrate");
GRAY_MIGRATE_FLAG.set(false);
}
});
}
actualRowCount += page.getPageItems().size();
DEFAULT_LOG.info("[gray-migrate-tag] submit gray task {} / {}", actualRowCount, rowCount);
}
}
try {
int unfinishedTaskCount = 0;
while ((unfinishedTaskCount = executorService.getQueue().size() + executorService.getActiveCount()) > 0) {
DEFAULT_LOG.info("[gray-migrate-tag] wait {} migrate tasks to be finished", unfinishedTaskCount);
Thread.sleep(1000L);
}
} catch (Exception e) {
DEFAULT_LOG.error("[gray-migrate-tag] wait migrate tasks to be finished error", e);
throw e;
}
//shut down migrate executor
executorService.shutdown();
}