public void StartConnector()

in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/FlumeConnector.java [42:117]


    public void StartConnector(FlumeConfig flumeConfig) throws Exception {
        SSLUtil.initGlobalSSLParameters();
        String agentName = flumeConfig.getName();
        boolean reload = !flumeConfig.getNoReloadConf();
        boolean isZkConfigured = false;
        if (flumeConfig.getZkConnString().length() > 0) {
            isZkConfigured = true;
        }
        if (isZkConfigured) {
            // get options
            String zkConnectionStr = flumeConfig.getZkConnString();
            String baseZkPath = flumeConfig.getZkBasePath();
            if (reload) {
                EventBus eventBus = new EventBus(agentName + "-event-bus");
                List<LifecycleAware> components = Lists.newArrayList();
                PollingZooKeeperConfigurationProvider zookeeperConfigurationProvider =
                        new PollingZooKeeperConfigurationProvider(
                                agentName, zkConnectionStr, baseZkPath, eventBus);
                components.add(zookeeperConfigurationProvider);
                application = new Application(components);
                eventBus.register(application);
            } else {
                StaticZooKeeperConfigurationProvider zookeeperConfigurationProvider =
                        new StaticZooKeeperConfigurationProvider(
                                agentName, zkConnectionStr, baseZkPath);
                application = new Application();
                application.handleConfigurationEvent(zookeeperConfigurationProvider.getConfiguration());
            }

        } else {
            File configurationFile = new File(flumeConfig.getConfFile());
             /*
         * The following is to ensure that by default the agent will fail on
         * startup if the file does not exist.
         */
            if (!configurationFile.exists()) {
                // If command line invocation, then need to fail fast
                if (System.getProperty(Constants.SYSPROP_CALLED_FROM_SERVICE) ==
                        null) {
                    String path = configurationFile.getPath();
                    try {
                        path = configurationFile.getCanonicalPath();
                    } catch (IOException ex) {
                        log.error("Failed to read canonical path for file: " + path,
                                ex);
                    }
                    throw new ParseException("The specified configuration file does not exist: " + path);
                }
            }
            List<LifecycleAware> components = Lists.newArrayList();

            if (reload) {
                EventBus eventBus = new EventBus(agentName + "-event-bus");
                PollingPropertiesFileConfigurationProvider configurationProvider =
                        new PollingPropertiesFileConfigurationProvider(
                                agentName, configurationFile, eventBus, 30);
                components.add(configurationProvider);
                application = new Application(components);
                eventBus.register(application);
            } else {
                PropertiesFileConfigurationProvider configurationProvider =
                        new PropertiesFileConfigurationProvider(agentName, configurationFile);
                application = new Application();
                application.handleConfigurationEvent(configurationProvider.getConfiguration());
            }
        }
        application.start();

        final Application appReference = application;
        Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {
            @Override
            public void run() {
                appReference.stop();
            }
        });
    }