private void loadSinkGroups()

in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [478:537]


    private void loadSinkGroups(AgentConfiguration agentConf,
                                Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
            throws InstantiationException {
        Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
        Map<String, ComponentConfiguration> compMap =
                agentConf.getSinkGroupConfigMap();
        Map<String, String> usedSinks = new HashMap<String, String>();
        for (String groupName : sinkGroupNames) {
            ComponentConfiguration comp = compMap.get(groupName);
            if (comp != null) {
                SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
                List<Sink> groupSinks = new ArrayList<Sink>();
                for (String sink : groupConf.getSinks()) {
                    Sink s = sinks.remove(sink);
                    if (s == null) {
                        String sinkUser = usedSinks.get(sink);
                        if (sinkUser != null) {
                            throw new InstantiationException(String.format(
                                    "Sink %s of group %s already " +
                                            "in use by group %s", sink, groupName, sinkUser));
                        } else {
                            throw new InstantiationException(String.format(
                                    "Sink %s of group %s does "
                                            + "not exist or is not properly configured", sink,
                                    groupName));
                        }
                    }
                    groupSinks.add(s);
                    usedSinks.put(sink, groupName);
                }
                try {
                    SinkGroup group = new SinkGroup(groupSinks);
                    Configurables.configure(group, groupConf);
                    sinkRunnerMap.put(comp.getComponentName(),
                            new SinkRunner(group.getProcessor()));
                } catch (Exception e) {
                    String msg = String.format("SinkGroup %s has been removed due to " +
                            "an error during configuration", groupName);
                    LOGGER.error(msg, e);
                }
            }
        }
        // add any unassigned sinks to solo collectors
        for (Entry<String, Sink> entry : sinks.entrySet()) {
            if (!usedSinks.containsValue(entry.getKey())) {
                try {
                    SinkProcessor pr = new DefaultSinkProcessor();
                    List<Sink> sinkMap = new ArrayList<Sink>();
                    sinkMap.add(entry.getValue());
                    pr.setSinks(sinkMap);
                    Configurables.configure(pr, new Context());
                    sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
                } catch (Exception e) {
                    String msg = String.format("SinkGroup %s has been removed due to " +
                            "an error during configuration", entry.getKey());
                    LOGGER.error(msg, e);
                }
            }
        }
    }