in deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalLauncher.java [34:134]
public static void main(String[] args) {
try {
logger.info("## set default uncaught exception handler");
setGlobalUncaughtExceptionHandler();
// 支持rocketmq client 配置日志路径
System.setProperty("rocketmq.client.logUseSlf4j","true");
logger.info("## load canal configurations");
String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
properties.load(new FileInputStream(conf));
}
final CanalStarter canalStater = new CanalStarter(properties);
String managerAddress = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
if (StringUtils.isNotEmpty(managerAddress)) {
String user = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
String passwd = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
if (StringUtils.isEmpty(passwd)) {
throw new IllegalArgumentException(
"canal.admin.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");
}
String adminPort = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110");
boolean autoRegister = BooleanUtils.toBoolean(CanalController.getProperty(properties,
CanalConstants.CANAL_ADMIN_AUTO_REGISTER));
String autoCluster = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_AUTO_CLUSTER);
String name = CanalController.getProperty(properties, CanalConstants.CANAL_ADMIN_REGISTER_NAME);
if (StringUtils.isEmpty(name)) {
name = AddressUtils.getHostName();
}
String registerIp = CanalController.getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
if (StringUtils.isEmpty(registerIp)) {
registerIp = AddressUtils.getHostIp();
}
final PlainCanalConfigClient configClient = new PlainCanalConfigClient(managerAddress,
user,
passwd,
registerIp,
Integer.parseInt(adminPort),
autoRegister,
autoCluster,
name);
PlainCanal canalConfig = configClient.findServer(null);
if (canalConfig == null) {
throw new IllegalArgumentException("managerAddress:" + managerAddress
+ " can't not found config for [" + registerIp + ":" + adminPort
+ "]");
}
Properties managerProperties = canalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
int scanIntervalInSecond = Integer.valueOf(CanalController.getProperty(managerProperties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
executor.scheduleWithFixedDelay(new Runnable() {
private PlainCanal lastCanalConfig;
public void run() {
try {
if (lastCanalConfig == null) {
lastCanalConfig = configClient.findServer(null);
} else {
PlainCanal newCanalConfig = configClient.findServer(lastCanalConfig.getMd5());
if (newCanalConfig != null) {
// 远程配置canal.properties修改重新加载整个应用
canalStater.stop();
Properties managerProperties = newCanalConfig.getProperties();
// merge local
managerProperties.putAll(properties);
canalStater.setProperties(managerProperties);
canalStater.start();
lastCanalConfig = newCanalConfig;
}
}
} catch (Throwable e) {
logger.error("scan failed", e);
}
}
}, 0, scanIntervalInSecond, TimeUnit.SECONDS);
canalStater.setProperties(managerProperties);
} else {
canalStater.setProperties(properties);
}
canalStater.start();
runningLatch.await();
executor.shutdownNow();
} catch (Throwable e) {
logger.error("## Something goes wrong when starting up the canal Server:", e);
}
}