in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [261:360]
private void loadSources(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap,
Map<String, SourceRunner> sourceRunnerMap)
throws InstantiationException {
Set<String> sourceNames = agentConf.getSourceSet();
Map<String, ComponentConfiguration> compMap =
agentConf.getSourceConfigMap();
/*
* Components which have a ComponentConfiguration object
*/
for (String sourceName : sourceNames) {
ComponentConfiguration comp = compMap.get(sourceName);
if (comp != null) {
SourceConfiguration config = (SourceConfiguration) comp;
Source source = sourceFactory.create(comp.getComponentName(),
comp.getType());
try {
Configurables.configure(source, config);
Set<String> channelNames = config.getChannels();
List<Channel> sourceChannels =
getSourceChannels(channelComponentMap, source, channelNames);
if (sourceChannels.isEmpty()) {
String msg = String.format("Source %s is not connected to a " +
"channel", sourceName);
throw new IllegalStateException(msg);
}
ChannelSelectorConfiguration selectorConfig =
config.getSelectorConfiguration();
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, config);
source.setChannelProcessor(channelProcessor);
sourceRunnerMap.put(comp.getComponentName(),
SourceRunner.forSource(source));
for (Channel channel : sourceChannels) {
ChannelComponent channelComponent =
Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
String.format("Channel %s", channel.getName()));
channelComponent.components.add(sourceName);
}
} catch (Exception e) {
String msg = String.format("Source %s has been removed due to an " +
"error during configuration", sourceName);
LOGGER.error(msg, e);
}
}
}
/*
* Components which DO NOT have a ComponentConfiguration object
* and use only Context
*/
Map<String, Context> sourceContexts = agentConf.getSourceContext();
for (String sourceName : sourceNames) {
Context context = sourceContexts.get(sourceName);
if (context != null) {
Source source =
sourceFactory.create(sourceName,
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
try {
Configurables.configure(source, context);
String[] channelNames = context.getString(
BasicConfigurationConstants.CONFIG_CHANNELS).split("\\s+");
List<Channel> sourceChannels =
getSourceChannels(channelComponentMap, source, Arrays.asList(channelNames));
if (sourceChannels.isEmpty()) {
String msg = String.format("Source %s is not connected to a " +
"channel", sourceName);
throw new IllegalStateException(msg);
}
Map<String, String> selectorConfig = context.getSubProperties(
BasicConfigurationConstants.CONFIG_SOURCE_CHANNELSELECTOR_PREFIX);
ChannelSelector selector = ChannelSelectorFactory.create(
sourceChannels, selectorConfig);
ChannelProcessor channelProcessor = new ChannelProcessor(selector);
Configurables.configure(channelProcessor, context);
source.setChannelProcessor(channelProcessor);
sourceRunnerMap.put(sourceName,
SourceRunner.forSource(source));
for (Channel channel : sourceChannels) {
ChannelComponent channelComponent =
Preconditions.checkNotNull(channelComponentMap.get(channel.getName()),
String.format("Channel %s", channel.getName()));
channelComponent.components.add(sourceName);
}
} catch (Exception e) {
String msg = String.format("Source %s has been removed due to an " +
"error during configuration", sourceName);
LOGGER.error(msg, e);
}
}
}
}