in deployer/src/main/java/com/alibaba/otter/canal/deployer/CanalController.java [83:345]
public CanalController(final Properties properties){
managerClients = MigrateMap.makeComputingMap(this::getManagerClient);
// 初始化全局参数设置
globalInstanceConfig = initGlobalConfig(properties);
instanceConfigs = new MapMaker().makeMap();
// 初始化instance config
initInstanceConfig(properties);
// init socketChannel
String socketChannel = getProperty(properties, CanalConstants.CANAL_SOCKETCHANNEL);
if (StringUtils.isNotEmpty(socketChannel)) {
System.setProperty(CanalConstants.CANAL_SOCKETCHANNEL, socketChannel);
}
// 兼容1.1.0版本的ak/sk参数名
String accesskey = getProperty(properties, "canal.instance.rds.accesskey");
String secretkey = getProperty(properties, "canal.instance.rds.secretkey");
if (StringUtils.isNotEmpty(accesskey)) {
System.setProperty(CanalConstants.CANAL_ALIYUN_ACCESSKEY, accesskey);
}
if (StringUtils.isNotEmpty(secretkey)) {
System.setProperty(CanalConstants.CANAL_ALIYUN_SECRETKEY, secretkey);
}
// 准备canal server
ip = getProperty(properties, CanalConstants.CANAL_IP);
registerIp = getProperty(properties, CanalConstants.CANAL_REGISTER_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT, "11111"));
adminPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_ADMIN_PORT, "11110"));
embeddedCanalServer = CanalServerWithEmbedded.instance();
embeddedCanalServer.setCanalInstanceGenerator(instanceGenerator);// 设置自定义的instanceGenerator
int metricsPort = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_METRICS_PULL_PORT, "11112"));
embeddedCanalServer.setMetricsPort(metricsPort);
this.adminUser = getProperty(properties, CanalConstants.CANAL_ADMIN_USER);
this.adminPasswd = getProperty(properties, CanalConstants.CANAL_ADMIN_PASSWD);
String user = getProperty(properties, CanalConstants.CANAL_USER);
String passwd = getProperty(properties, CanalConstants.CANAL_PASSWD);
if (StringUtils.isNotEmpty(user) && StringUtils.isEmpty(passwd)) {
throw new IllegalArgumentException(
"canal.user = " + user + " , but canal.passwd is empty , pls check https://github.com/alibaba/canal/issues/4941");
}
embeddedCanalServer.setUser(user);
embeddedCanalServer.setPasswd(passwd);
String canalWithoutNetty = getProperty(properties, CanalConstants.CANAL_WITHOUT_NETTY);
if (canalWithoutNetty == null || "false".equals(canalWithoutNetty)) {
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);
}
// 处理下ip为空,默认使用hostIp暴露到zk中
if (StringUtils.isEmpty(ip) && StringUtils.isEmpty(registerIp)) {
ip = registerIp = AddressUtils.getHostIp();
}
if (StringUtils.isEmpty(ip)) {
ip = AddressUtils.getHostIp();
}
if (StringUtils.isEmpty(registerIp)) {
registerIp = ip; // 兼容以前配置
}
final String zkServers = getProperty(properties, CanalConstants.CANAL_ZKSERVERS);
if (StringUtils.isNotEmpty(zkServers)) {
zkclientx = ZkClientx.getZkClient(zkServers);
// 初始化系统目录
zkclientx.createPersistent(ZookeeperPathUtils.DESTINATION_ROOT_NODE, true);
zkclientx.createPersistent(ZookeeperPathUtils.CANAL_CLUSTER_ROOT_NODE, true);
}
final ServerRunningData serverData = new ServerRunningData(registerIp + ":" + port);
ServerRunningMonitors.setServerData(serverData);
ServerRunningMonitors.setRunningMonitors(MigrateMap.makeComputingMap((Function<String, ServerRunningMonitor>) destination -> {
ServerRunningMonitor runningMonitor = new ServerRunningMonitor(serverData);
runningMonitor.setDestination(destination);
runningMonitor.setListener(new ServerRunningListener() {
public void processActiveEnter() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
embeddedCanalServer.start(destination);
if (canalMQStarter != null) {
canalMQStarter.startDestination(destination);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processActiveExit() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (canalMQStarter != null) {
canalMQStarter.stopDestination(destination);
}
embeddedCanalServer.stop(destination);
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processStart() {
try {
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
registerIp + ":" + port);
initCid(path);
zkclientx.subscribeStateChanges(new IZkStateListener() {
public void handleStateChanged(KeeperState state) throws Exception {
}
public void handleNewSession() throws Exception {
initCid(path);
}
@Override
public void handleSessionEstablishmentError(Throwable error) throws Exception {
logger.error("failed to connect to zookeeper", error);
}
});
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
public void processStop() {
try {
MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
if (zkclientx != null) {
final String path = ZookeeperPathUtils.getDestinationClusterNode(destination,
registerIp + ":" + port);
releaseCid(path);
}
} finally {
MDC.remove(CanalConstants.MDC_DESTINATION);
}
}
});
if (zkclientx != null) {
runningMonitor.setZkClient(zkclientx);
}
// 触发创建一下cid节点
runningMonitor.init();
return runningMonitor;
}));
// 初始化monitor机制
autoScan = BooleanUtils.toBoolean(getProperty(properties, CanalConstants.CANAL_AUTO_SCAN));
if (autoScan) {
defaultAction = new InstanceAction() {
public void start(String destination) {
InstanceConfig config = instanceConfigs.get(destination);
if (config == null) {
// 重新读取一下instance config
config = parseInstanceConfig(properties, destination);
instanceConfigs.put(destination, config);
}
if (!embeddedCanalServer.isStart(destination)) {
// HA机制启动
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
runningMonitor.start();
}
}
logger.info("auto notify start {} successful.", destination);
}
public void stop(String destination) {
// 此处的stop,代表强制退出,非HA机制,所以需要退出HA的monitor和配置信息
InstanceConfig config = instanceConfigs.remove(destination);
if (config != null) {
embeddedCanalServer.stop(destination);
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (runningMonitor.isStart()) {
runningMonitor.stop();
}
}
logger.info("auto notify stop {} successful.", destination);
}
public void reload(String destination) {
// 目前任何配置变化,直接重启,简单处理
stop(destination);
start(destination);
logger.info("auto notify reload {} successful.", destination);
}
@Override
public void release(String destination) {
// 此处的release,代表强制释放,主要针对HA机制释放运行,让给其他机器抢占
InstanceConfig config = instanceConfigs.get(destination);
if (config != null) {
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (runningMonitor.isStart()) {
boolean release = runningMonitor.release();
if (!release) {
// 如果是单机模式,则直接清除配置
instanceConfigs.remove(destination);
// 停掉服务
runningMonitor.stop();
if (instanceConfigMonitors.containsKey(InstanceConfig.InstanceMode.MANAGER)) {
ManagerInstanceConfigMonitor monitor = (ManagerInstanceConfigMonitor) instanceConfigMonitors.get(InstanceConfig.InstanceMode.MANAGER);
Map<String, InstanceAction> instanceActions = monitor.getActions();
if (instanceActions.containsKey(destination)) {
// 清除内存中的autoScan cache
monitor.release(destination);
}
}
}
}
}
logger.info("auto notify release {} successful.", destination);
}
};
instanceConfigMonitors = MigrateMap.makeComputingMap(mode -> {
int scanInterval = Integer.valueOf(getProperty(properties,
CanalConstants.CANAL_AUTO_SCAN_INTERVAL,
"5"));
if (mode.isSpring()) {
SpringInstanceConfigMonitor monitor = new SpringInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
// 设置conf目录,默认是user.dir + conf目录组成
String rootDir = getProperty(properties, CanalConstants.CANAL_CONF_DIR);
if (StringUtils.isEmpty(rootDir)) {
rootDir = "../conf";
}
if (StringUtils.equals("otter-canal", System.getProperty("appName"))) {
monitor.setRootConf(rootDir);
} else {
// eclipse debug模式
monitor.setRootConf("src/main/resources/");
}
return monitor;
} else if (mode.isManager()) {
ManagerInstanceConfigMonitor monitor = new ManagerInstanceConfigMonitor();
monitor.setScanIntervalInSecond(scanInterval);
monitor.setDefaultAction(defaultAction);
String managerAddress = getProperty(properties, CanalConstants.CANAL_ADMIN_MANAGER);
monitor.setConfigClient(getManagerClient(managerAddress));
return monitor;
} else {
throw new UnsupportedOperationException("unknow mode :" + mode + " for monitor");
}
});
}
}