public MaterializedConfiguration getConfiguration()

in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [96:143]


    public MaterializedConfiguration getConfiguration() {
        MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
        FlumeConfiguration fconfig = getFlumeConfiguration();
        AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
        if (agentConf != null) {
            Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
            Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
            Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
            try {
                loadChannels(agentConf, channelComponentMap);
                loadSources(agentConf, channelComponentMap, sourceRunnerMap);
                loadSinks(agentConf, channelComponentMap, sinkRunnerMap);
                Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
                for (String channelName : channelNames) {
                    ChannelComponent channelComponent = channelComponentMap.get(channelName);
                    if (channelComponent.components.isEmpty()) {
                        LOGGER.warn("Channel {} has no components connected" +
                                " and has been removed.", channelName);
                        channelComponentMap.remove(channelName);
                        Map<String, Channel> nameChannelMap =
                                channelCache.get(channelComponent.channel.getClass());
                        if (nameChannelMap != null) {
                            nameChannelMap.remove(channelName);
                        }
                    } else {
                        LOGGER.info("Channel {} connected to {}",
                                channelName, channelComponent.components.toString());
                        conf.addChannel(channelName, channelComponent.channel);
                    }
                }
                for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
                    conf.addSourceRunner(entry.getKey(), entry.getValue());
                }
                for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
                    conf.addSinkRunner(entry.getKey(), entry.getValue());
                }
            } catch (InstantiationException ex) {
                LOGGER.error("Failed to instantiate component", ex);
            } finally {
                channelComponentMap.clear();
                sourceRunnerMap.clear();
                sinkRunnerMap.clear();
            }
        } else {
            LOGGER.warn("No configuration found for this host:{}", getAgentName());
        }
        return conf;
    }