in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [478:537]
private void loadSinkGroups(AgentConfiguration agentConf,
Map<String, Sink> sinks, Map<String, SinkRunner> sinkRunnerMap)
throws InstantiationException {
Set<String> sinkGroupNames = agentConf.getSinkgroupSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSinkGroupConfigMap();
Map<String, String> usedSinks = new HashMap<String, String>();
for (String groupName : sinkGroupNames) {
ComponentConfiguration comp = compMap.get(groupName);
if (comp != null) {
SinkGroupConfiguration groupConf = (SinkGroupConfiguration) comp;
List<Sink> groupSinks = new ArrayList<Sink>();
for (String sink : groupConf.getSinks()) {
Sink s = sinks.remove(sink);
if (s == null) {
String sinkUser = usedSinks.get(sink);
if (sinkUser != null) {
throw new InstantiationException(String.format(
"Sink %s of group %s already " +
"in use by group %s", sink, groupName, sinkUser));
} else {
throw new InstantiationException(String.format(
"Sink %s of group %s does "
+ "not exist or is not properly configured", sink,
groupName));
}
}
groupSinks.add(s);
usedSinks.put(sink, groupName);
}
try {
SinkGroup group = new SinkGroup(groupSinks);
Configurables.configure(group, groupConf);
sinkRunnerMap.put(comp.getComponentName(),
new SinkRunner(group.getProcessor()));
} catch (Exception e) {
String msg = String.format("SinkGroup %s has been removed due to " +
"an error during configuration", groupName);
LOGGER.error(msg, e);
}
}
}
// add any unassigned sinks to solo collectors
for (Entry<String, Sink> entry : sinks.entrySet()) {
if (!usedSinks.containsValue(entry.getKey())) {
try {
SinkProcessor pr = new DefaultSinkProcessor();
List<Sink> sinkMap = new ArrayList<Sink>();
sinkMap.add(entry.getValue());
pr.setSinks(sinkMap);
Configurables.configure(pr, new Context());
sinkRunnerMap.put(entry.getKey(), new SinkRunner(pr));
} catch (Exception e) {
String msg = String.format("SinkGroup %s has been removed due to " +
"an error during configuration", entry.getKey());
LOGGER.error(msg, e);
}
}
}
}