private static StandaloneConnectController createConnectController()

in rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/StandaloneConnectStartup.java [91:196]


    private static StandaloneConnectController createConnectController(String[] args) {
        try {
            // Build the command line options.
            Options options = ServerUtil.buildCommandlineOptions(new Options());
            commandLine = ServerUtil.parseCmdLine("connect", args, buildCommandlineOptions(options),
                    new PosixParser());
            if (null == commandLine) {
                System.exit(-1);
            }

            // Load configs from command line.
            StandaloneConfig config = new StandaloneConfig();
            if (commandLine.hasOption('c')) {
                String file = commandLine.getOptionValue('c').trim();
                if (file != null) {
                    configFile = file;
                    InputStream in = new BufferedInputStream(new FileInputStream(file));
                    properties = new Properties();
                    properties.load(in);
                    FileAndPropertyUtil.properties2Object(properties, config);
                    in.close();
                }
            }

            if (StringUtils.isNotEmpty(config.getMetricsConfigPath())) {
                String file = config.getMetricsConfigPath();
                InputStream in = new BufferedInputStream(new FileInputStream(file));
                properties = new Properties();
                properties.load(in);
                Map<String, String> metricsConfig = new ConcurrentHashMap<>();
                if (properties.contains(WorkerConfig.METRIC_CLASS)) {
                    throw new IllegalArgumentException("[metrics.reporter] is empty");
                }
                for (Map.Entry<Object, Object> entry : properties.entrySet()) {
                    if (entry.getKey().equals(WorkerConfig.METRIC_CLASS)) {
                        continue;
                    }
                    metricsConfig.put(entry.getKey().toString(), entry.getValue().toString());
                }
                config.getMetricsConfig().put(properties.getProperty(WorkerConfig.METRIC_CLASS), metricsConfig);
                in.close();
            }

            if (null == config.getConnectHome()) {
                System.out.printf("Please set the %s variable in your environment to match the location of the Connect installation", WorkerConfig.CONNECT_HOME_ENV);
                System.exit(-2);
            }

            LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
            JoranConfigurator configurator = new JoranConfigurator();
            configurator.setContext(lc);
            lc.reset();
            configurator.doConfigure(config.getConnectHome() + "/conf/logback.xml");

            List<String> pluginPaths = new ArrayList<>(16);
            if (StringUtils.isNotEmpty(config.getPluginPaths())) {
                String[] strArr = config.getPluginPaths().split(",");
                for (String path : strArr) {
                    if (StringUtils.isNotEmpty(path)) {
                        pluginPaths.add(path);
                    }
                }
            }
            Plugin plugin = new Plugin(pluginPaths);
            ClusterManagementService clusterManagementService = new MemoryClusterManagementServiceImpl();
            clusterManagementService.initialize(config);
            ConfigManagementService configManagementService = new MemoryConfigManagementServiceImpl();
            configManagementService.initialize(config, null, plugin);
            PositionManagementService positionManagementServices = new FilePositionManagementServiceImpl();
            positionManagementServices.initialize(config, null, null);
            StateManagementService stateManagementService = new MemoryStateManagementServiceImpl();
            stateManagementService.initialize(config, null);
            StandaloneConnectController controller = new StandaloneConnectController(
                    plugin,
                    config,
                    clusterManagementService,
                    configManagementService,
                    positionManagementServices,
                    stateManagementService);
            // Invoked when shutdown.
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                private volatile boolean hasShutdown = false;
                private AtomicInteger shutdownTimes = new AtomicInteger(0);

                @Override
                public void run() {
                    synchronized (this) {
                        log.info("Shutdown hook was invoked, {}", this.shutdownTimes.incrementAndGet());
                        if (!this.hasShutdown) {
                            this.hasShutdown = true;
                            long beginTime = System.currentTimeMillis();
                            controller.shutdown();
                            long consumingTimeTotal = System.currentTimeMillis() - beginTime;
                            log.info("Shutdown hook over, consuming total time(ms): {}", consumingTimeTotal);
                        }
                    }
                }
            }, "ShutdownHook"));
            return controller;

        } catch (Throwable e) {
            e.printStackTrace();
            System.exit(-1);
        }
        return null;
    }