private void loadSinks()

in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [409:476]


    private void loadSinks(AgentConfiguration agentConf,
                           Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
            throws InstantiationException {
        Set<String> sinkNames = agentConf.getSinkSet();
        Map<String, ComponentConfiguration> compMap =
                agentConf.getSinkConfigMap();
        Map<String, Sink> sinks = new HashMap<String, Sink>();
    /*
     * Components which have a ComponentConfiguration object
     */
        for (String sinkName : sinkNames) {
            ComponentConfiguration comp = compMap.get(sinkName);
            if (comp != null) {
                SinkConfiguration config = (SinkConfiguration) comp;
                Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
                try {
                    Configurables.configure(sink, config);
                    ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
                    if (channelComponent == null) {
                        String msg = String.format("Sink %s is not connected to a " +
                                "channel", sinkName);
                        throw new IllegalStateException(msg);
                    }
                    checkSinkChannelCompatibility(sink, channelComponent.channel);
                    sink.setChannel(channelComponent.channel);
                    sinks.put(comp.getComponentName(), sink);
                    channelComponent.components.add(sinkName);
                } catch (Exception e) {
                    String msg = String.format("Sink %s has been removed due to an " +
                            "error during configuration", sinkName);
                    LOGGER.error(msg, e);
                }
            }
        }
    /*
     * Components which DO NOT have a ComponentConfiguration object
     * and use only Context
     */
        Map<String, Context> sinkContexts = agentConf.getSinkContext();
        for (String sinkName : sinkNames) {
            Context context = sinkContexts.get(sinkName);
            if (context != null) {
                Sink sink = sinkFactory.create(sinkName, context.getString(
                        BasicConfigurationConstants.CONFIG_TYPE));
                try {
                    Configurables.configure(sink, context);
                    ChannelComponent channelComponent =
                            channelComponentMap.get(
                                    context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
                    if (channelComponent == null) {
                        String msg = String.format("Sink %s is not connected to a " +
                                "channel", sinkName);
                        throw new IllegalStateException(msg);
                    }
                    checkSinkChannelCompatibility(sink, channelComponent.channel);
                    sink.setChannel(channelComponent.channel);
                    sinks.put(sinkName, sink);
                    channelComponent.components.add(sinkName);
                } catch (Exception e) {
                    String msg = String.format("Sink %s has been removed due to an " +
                            "error during configuration", sinkName);
                    LOGGER.error(msg, e);
                }
            }
        }

        loadSinkGroups(agentConf, sinks, sinkRunnerMap);
    }