in pulsar-io/flume/src/main/java/org/apache/pulsar/io/flume/node/AbstractConfigurationProvider.java [149:230]
private void loadChannels(AgentConfiguration agentConf,
Map<String, ChannelComponent> channelComponentMap)
throws InstantiationException {
LOGGER.info("Creating channels");
/*
* Some channels will be reused across re-configurations. To handle this,
* we store all the names of current channels, perform the reconfiguration,
* and then if a channel was not used, we delete our reference to it.
* This supports the scenario where you enable channel "ch0" then remove it
* and add it back. Without this, channels like memory channel would cause
* the first instances data to show up in the seconds.
*/
ListMultimap<Class<? extends Channel>, String> channelsNotReused =
ArrayListMultimap.create();
// assume all channels will not be re-used
for (Map.Entry<Class<? extends Channel>, Map<String, Channel>> entry :
channelCache.entrySet()) {
Class<? extends Channel> channelKlass = entry.getKey();
Set<String> channelNames = entry.getValue().keySet();
channelsNotReused.get(channelKlass).addAll(channelNames);
}
Set<String> channelNames = agentConf.getChannelSet();
Map<String, ComponentConfiguration> compMap = agentConf.getChannelConfigMap();
/*
* Components which have a ComponentConfiguration object
*/
for (String chName : channelNames) {
ComponentConfiguration comp = compMap.get(chName);
if (comp != null) {
Channel channel = getOrCreateChannel(channelsNotReused,
comp.getComponentName(), comp.getType());
try {
Configurables.configure(channel, comp);
channelComponentMap.put(comp.getComponentName(),
new ChannelComponent(channel));
LOGGER.info("Created channel " + chName);
} catch (Exception e) {
String msg = String.format("Channel %s has been removed due to an " +
"error during configuration", chName);
LOGGER.error(msg, e);
}
}
}
/*
* Components which DO NOT have a ComponentConfiguration object
* and use only Context
*/
for (String chName : channelNames) {
Context context = agentConf.getChannelContext().get(chName);
if (context != null) {
Channel channel = getOrCreateChannel(channelsNotReused, chName,
context.getString(BasicConfigurationConstants.CONFIG_TYPE));
try {
Configurables.configure(channel, context);
channelComponentMap.put(chName, new ChannelComponent(channel));
LOGGER.info("Created channel " + chName);
} catch (Exception e) {
String msg = String.format("Channel %s has been removed due to an " +
"error during configuration", chName);
LOGGER.error(msg, e);
}
}
}
/*
* Any channel which was not re-used, will have it's reference removed
*/
for (Class<? extends Channel> channelKlass : channelsNotReused.keySet()) {
Map<String, Channel> channelMap = channelCache.get(channelKlass);
if (channelMap != null) {
for (String channelName : channelsNotReused.get(channelKlass)) {
if (channelMap.remove(channelName) != null) {
LOGGER.info("Removed {} of type {}", channelName, channelKlass);
}
}
if (channelMap.isEmpty()) {
channelCache.remove(channelKlass);
}
}
}
}