in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java [91:196]
private static StandaloneConnectController createConnectController(String[] args) {
try {
// Build the command line options.
Options options = ServerUtil.buildCommandlineOptions(new Options());
commandLine = ServerUtil.parseCmdLine("connect", args, buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
System.exit(-1);
}
// Load configs from command line.
StandaloneConfig config = new StandaloneConfig();
if (commandLine.hasOption('c')) {
String file = commandLine.getOptionValue('c').trim();
if (file != null) {
configFile = file;
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
FileAndPropertyUtil.properties2Object(properties, config);
in.close();
}
}
if (StringUtils.isNotEmpty(config.getMetricsConfigPath())) {
String file = config.getMetricsConfigPath();
InputStream in = new BufferedInputStream(new FileInputStream(file));
properties = new Properties();
properties.load(in);
Map<String, String> metricsConfig = new ConcurrentHashMap<>();
if (properties.contains(WorkerConfig.METRIC_CLASS)) {
throw new IllegalArgumentException("[metrics.reporter] is empty");
}
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
if (entry.getKey().equals(WorkerConfig.METRIC_CLASS)) {
continue;
}
metricsConfig.put(entry.getKey().toString(), entry.getValue().toString());
}
config.getMetricsConfig().put(properties.getProperty(WorkerConfig.METRIC_CLASS), metricsConfig);
in.close();
}
if (null == config.getConnectHome()) {
System.out.printf("Please set the %s variable in your environment to match the location of the Connect installation", WorkerConfig.CONNECT_HOME_ENV);
System.exit(-2);
}
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
JoranConfigurator configurator = new JoranConfigurator();
configurator.setContext(lc);
lc.reset();
configurator.doConfigure(config.getConnectHome() + "/conf/logback.xml");
List<String> pluginPaths = new ArrayList<>(16);
if (StringUtils.isNotEmpty(config.getPluginPaths())) {
String[] strArr = config.getPluginPaths().split(",");
for (String path : strArr) {
if (StringUtils.isNotEmpty(path)) {
pluginPaths.add(path);
}
}
}
Plugin plugin = new Plugin(pluginPaths);
ClusterManagementService clusterManagementService = new MemoryClusterManagementServiceImpl();
clusterManagementService.initialize(config);
ConfigManagementService configManagementService = new MemoryConfigManagementServiceImpl();
configManagementService.initialize(config, null, plugin);
PositionManagementService positionManagementServices = new FilePositionManagementServiceImpl();
positionManagementServices.initialize(config, null, null);
StateManagementService stateManagementService = new MemoryStateManagementServiceImpl();
stateManagementService.initialize(config, null);
StandaloneConnectController controller = new StandaloneConnectController(
plugin,
config,
clusterManagementService,
configManagementService,
positionManagementServices,
stateManagementService);
// Invoked when shutdown.
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
private volatile boolean hasShutdown = false;
private AtomicInteger shutdownTimes = new AtomicInteger(0);
@Override
public void run() {
synchronized (this) {
log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
if (!this.hasShutdown) {
this.hasShutdown = true;
long beginTime = System.currentTimeMillis();
controller.shutdown();
long consumingTimeTotal = System.currentTimeMillis() - beginTime;
log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
}
}
}
}, "ShutdownHook"));
return controller;
} catch (Throwable e) {
e.printStackTrace();
System.exit(-1);
}
return null;
}