private void loadSources()

in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [261:360]


    private void loadSources(AgentConfiguration agentConf,
                             Map<String, ChannelComponent> channelComponentMap,
                             Map<String, SourceRunner> sourceRunnerMap)
            throws InstantiationException {

        Set<String> sourceNames = agentConf.getSourceSet();
        Map<String, ComponentConfiguration> compMap =
                agentConf.getSourceConfigMap();
    /*
     * Components which have a ComponentConfiguration object
     */
        for (String sourceName : sourceNames) {
            ComponentConfiguration comp = compMap.get(sourceName);
            if (comp != null) {
                SourceConfiguration config = (SourceConfiguration) comp;

                Source source = sourceFactory.create(comp.getComponentName(),
                        comp.getType());
                try {
                    Configurables.configure(source, config);
                    Set<String> channelNames = config.getChannels();
                    List<Channel> sourceChannels =
                            getSourceChannels(channelComponentMap, source, channelNames);
                    if (sourceChannels.isEmpty()) {
                        String msg = String.format("Source %s is not connected to a " +
                                "channel", sourceName);
                        throw new IllegalStateException(msg);
                    }
                    ChannelSelectorConfiguration selectorConfig =
                            config.getSelectorConfiguration();

                    ChannelSelector selector = ChannelSelectorFactory.create(
                            sourceChannels, selectorConfig);

                    ChannelProcessor channelProcessor = new ChannelProcessor(selector);
                    Configurables.configure(channelProcessor, config);

                    source.setChannelProcessor(channelProcessor);
                    sourceRunnerMap.put(comp.getComponentName(),
                            SourceRunner.forSource(source));
                    for (Channel channel : sourceChannels) {
                        ChannelComponent channelComponent =
                                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
                                        String.format("Channel %s", channel.getName()));
                        channelComponent.components.add(sourceName);
                    }
                } catch (Exception e) {
                    String msg = String.format("Source %s has been removed due to an " +
                            "error during configuration", sourceName);
                    LOGGER.error(msg, e);
                }
            }
        }
    /*
     * Components which DO NOT have a ComponentConfiguration object
     * and use only Context
     */
        Map<String, Context> sourceContexts = agentConf.getSourceContext();
        for (String sourceName : sourceNames) {
            Context context = sourceContexts.get(sourceName);
            if (context != null) {
                Source source =
                        sourceFactory.create(sourceName,
                                context.getString(BasicConfigurationConstants.CONFIG_TYPE));
                try {
                    Configurables.configure(source, context);
                    String[] channelNames = context.getString(
                            BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
                    List<Channel> sourceChannels =
                            getSourceChannels(channelComponentMap, source, Arrays.asList(channelNames));
                    if (sourceChannels.isEmpty()) {
                        String msg = String.format("Source %s is not connected to a " +
                                "channel", sourceName);
                        throw new IllegalStateException(msg);
                    }
                    Map<String, String> selectorConfig = context.getSubProperties(
                            BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);

                    ChannelSelector selector = ChannelSelectorFactory.create(
                            sourceChannels, selectorConfig);

                    ChannelProcessor channelProcessor = new ChannelProcessor(selector);
                    Configurables.configure(channelProcessor, context);
                    source.setChannelProcessor(channelProcessor);
                    sourceRunnerMap.put(sourceName,
                            SourceRunner.forSource(source));
                    for (Channel channel : sourceChannels) {
                        ChannelComponent channelComponent =
                                Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
                                        String.format("Channel %s", channel.getName()));
                        channelComponent.components.add(sourceName);
                    }
                } catch (Exception e) {
                    String msg = String.format("Source %s has been removed due to an " +
                            "error during configuration", sourceName);
                    LOGGER.error(msg, e);
                }
            }
        }
    }