in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [409:476]
private void loadSinks(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap, Map<String, SinkRunner> sinkRunnerMap)
throws InstantiationException {
Set<String> sinkNames = agentConf.getSinkSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSinkConfigMap();
Map<String, Sink> sinks = new HashMap<String, Sink>();
/*
* Components which have a ComponentConfiguration object
*/
for (String sinkName : sinkNames) {
ComponentConfiguration comp = compMap.get(sinkName);
if (comp != null) {
SinkConfiguration config = (SinkConfiguration) comp;
Sink sink = sinkFactory.create(comp.getComponentName(), comp.getType());
try {
Configurables.configure(sink, config);
ChannelComponent channelComponent = channelComponentMap.get(config.getChannel());
if (channelComponent == null) {
String msg = String.format("Sink %s is not connected to a " +
"channel", sinkName);
throw new IllegalStateException(msg);
}
checkSinkChannelCompatibility(sink, channelComponent.channel);
sink.setChannel(channelComponent.channel);
sinks.put(comp.getComponentName(), sink);
channelComponent.components.add(sinkName);
} catch (Exception e) {
String msg = String.format("Sink %s has been removed due to an " +
"error during configuration", sinkName);
LOGGER.error(msg, e);
}
}
}
/*
* Components which DO NOT have a ComponentConfiguration object
* and use only Context
*/
Map<String, Context> sinkContexts = agentConf.getSinkContext();
for (String sinkName : sinkNames) {
Context context = sinkContexts.get(sinkName);
if (context != null) {
Sink sink = sinkFactory.create(sinkName, context.getString(
BasicConfigurationConstants.CONFIG_TYPE));
try {
Configurables.configure(sink, context);
ChannelComponent channelComponent =
channelComponentMap.get(
context.getString(BasicConfigurationConstants.CONFIG_CHANNEL));
if (channelComponent == null) {
String msg = String.format("Sink %s is not connected to a " +
"channel", sinkName);
throw new IllegalStateException(msg);
}
checkSinkChannelCompatibility(sink, channelComponent.channel);
sink.setChannel(channelComponent.channel);
sinks.put(sinkName, sink);
channelComponent.components.add(sinkName);
} catch (Exception e) {
String msg = String.format("Sink %s has been removed due to an " +
"error during configuration", sinkName);
LOGGER.error(msg, e);
}
}
}
loadSinkGroups(agentConf, sinks, sinkRunnerMap);
}