in data-prepper-core/src/main/java/com/amazon/dataprepper/parser/PipelineParser.java [81:118]
private void buildPipelineFromConfiguration(
final String pipelineName,
final Map<String, PipelineConfiguration> pipelineConfigurationMap,
final Map<String, Pipeline> pipelineMap) {
final PipelineConfiguration pipelineConfiguration = pipelineConfigurationMap.get(pipelineName);
LOG.info("Building pipeline [{}] from provided configuration", pipelineName);
try {
final PluginSetting sourceSetting = pipelineConfiguration.getSourcePluginSetting();
final Optional<Source> pipelineSource = getSourceIfPipelineType(pipelineName, sourceSetting,
pipelineMap, pipelineConfigurationMap);
final Source source = pipelineSource.orElseGet(() ->
pluginFactory.loadPlugin(Source.class, sourceSetting));
LOG.info("Building buffer for the pipeline [{}]", pipelineName);
final Buffer buffer = pluginFactory.loadPlugin(Buffer.class, pipelineConfiguration.getBufferPluginSetting());
LOG.info("Building processors for the pipeline [{}]", pipelineName);
final int processorThreads = pipelineConfiguration.getWorkers();
final List<List<Processor>> processorSets = pipelineConfiguration.getProcessorPluginSettings().stream()
.map(this::newProcessor)
.collect(Collectors.toList());
final int readBatchDelay = pipelineConfiguration.getReadBatchDelay();
LOG.info("Building sinks for the pipeline [{}]", pipelineName);
final List<Sink> sinks = pipelineConfiguration.getSinkPluginSettings().stream()
.map(this::buildSinkOrConnector)
.collect(Collectors.toList());
final Pipeline pipeline = new Pipeline(pipelineName, source, buffer, processorSets, sinks, processorThreads, readBatchDelay);
pipelineMap.put(pipelineName, pipeline);
} catch (Exception ex) {
//If pipeline construction errors out, we will skip that pipeline and proceed
LOG.error("Construction of pipeline components failed, skipping building of pipeline [{}] and its connected " +
"pipelines", pipelineName, ex);
processRemoveIfRequired(pipelineName, pipelineConfigurationMap, pipelineMap);
}
}