private void loadChannels()

in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [149:230]


    private void loadChannels(AgentConfiguration agentConf,
                              Map<String, ChannelComponent> channelComponentMap)
            throws InstantiationException {
        LOGGER.info("Creating channels");

    /*
     * Some channels will be reused across re-configurations. To handle this,
     * we store all the names of current channels, perform the reconfiguration,
     * and then if a channel was not used, we delete our reference to it.
     * This supports the scenario where you enable channel "ch0" then remove it
     * and add it back. Without this, channels like memory channel would cause
     * the first instances data to show up in the seconds.
     */
        ListMultimap<Class<? extends Channel>, String> channelsNotReused =
                ArrayListMultimap.create();
        // assume all channels will not be re-used
        for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
                channelCache.entrySet()) {
            Class<? extends Channel> channelKlass = entry.getKey();
            Set<String> channelNames = entry.getValue().keySet();
            channelsNotReused.get(channelKlass).addAll(channelNames);
        }

        Set<String> channelNames = agentConf.getChannelSet();
        Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
    /*
     * Components which have a ComponentConfiguration object
     */
        for (String chName : channelNames) {
            ComponentConfiguration comp = compMap.get(chName);
            if (comp != null) {
                Channel channel = getOrCreateChannel(channelsNotReused,
                        comp.getComponentName(), comp.getType());
                try {
                    Configurables.configure(channel, comp);
                    channelComponentMap.put(comp.getComponentName(),
                            new ChannelComponent(channel));
                    LOGGER.info("Created channel " + chName);
                } catch (Exception e) {
                    String msg = String.format("Channel %s has been removed due to an " +
                            "error during configuration", chName);
                    LOGGER.error(msg, e);
                }
            }
        }
    /*
     * Components which DO NOT have a ComponentConfiguration object
     * and use only Context
     */
        for (String chName : channelNames) {
            Context context = agentConf.getChannelContext().get(chName);
            if (context != null) {
                Channel channel = getOrCreateChannel(channelsNotReused, chName,
                        context.getString(BasicConfigurationConstants.CONFIG_TYPE));
                try {
                    Configurables.configure(channel, context);
                    channelComponentMap.put(chName, new ChannelComponent(channel));
                    LOGGER.info("Created channel " + chName);
                } catch (Exception e) {
                    String msg = String.format("Channel %s has been removed due to an " +
                            "error during configuration", chName);
                    LOGGER.error(msg, e);
                }
            }
        }
    /*
     * Any channel which was not re-used, will have it's reference removed
     */
        for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
            Map<String, Channel> channelMap = channelCache.get(channelKlass);
            if (channelMap != null) {
                for (String channelName : channelsNotReused.get(channelKlass)) {
                    if (channelMap.remove(channelName) != null) {
                        LOGGER.info("Removed {} of type {}", channelName, channelKlass);
                    }
                }
                if (channelMap.isEmpty()) {
                    channelCache.remove(channelKlass);
                }
            }
        }
    }