in libminifi/src/core/flow/StructuredConfiguration.cpp [299:439]
void StructuredConfiguration::parseProcessorNode(const Node& processors_node, core::ProcessGroup* parentGroup) {
utils::Identifier uuid;
std::unique_ptr<core::Processor> processor;
if (!parentGroup) {
logger_->log_error("parseProcessNode: no parent group exists");
return;
}
if (!processors_node) {
throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
if (!processors_node.isSequence()) {
throw std::invalid_argument(
"Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
// Evaluate sequence of processors
for (const auto& procNode : processors_node) {
core::ProcessorConfig procCfg;
checkRequiredField(procNode, schema_.name);
procCfg.name = procNode[schema_.name].getString().value();
procCfg.id = getOrGenerateId(procNode);
uuid = procCfg.id;
logger_->log_debug("parseProcessorNode: name => [{}] id => [{}]", procCfg.name, procCfg.id);
checkRequiredField(procNode, schema_.type);
procCfg.javaClass = procNode[schema_.type].getString().value();
logger_->log_debug("parseProcessorNode: class => [{}]", procCfg.javaClass);
// Determine the processor name only from the Java class
processor = createProcessor(utils::string::partAfterLastOccurrenceOf(procCfg.javaClass, '.'), procCfg.javaClass, uuid);
if (!processor) {
logger_->log_error("Could not create a processor {} with id {}", procCfg.name, procCfg.id);
throw std::invalid_argument("Could not create processor " + procCfg.name);
}
processor->setName(procCfg.name);
processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
procCfg.schedulingStrategy = getOptionalField(procNode, schema_.scheduling_strategy, DEFAULT_SCHEDULING_STRATEGY);
logger_->log_debug("parseProcessorNode: scheduling strategy => [{}]", procCfg.schedulingStrategy);
procCfg.schedulingPeriod = getOptionalField(procNode, schema_.scheduling_period, DEFAULT_SCHEDULING_PERIOD_STR);
logger_->log_debug("parseProcessorNode: scheduling period => [{}]", procCfg.schedulingPeriod);
if (auto tasksNode = procNode[schema_.max_concurrent_tasks]) {
procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value();
logger_->log_debug("parseProcessorNode: max concurrent tasks => [{}]", procCfg.maxConcurrentTasks);
}
if (auto penalizationNode = procNode[schema_.penalization_period]) {
procCfg.penalizationPeriod = penalizationNode.getString().value();
logger_->log_debug("parseProcessorNode: penalization period => [{}]", procCfg.penalizationPeriod);
}
if (auto yieldNode = procNode[schema_.proc_yield_period]) {
procCfg.yieldPeriod = yieldNode.getString().value();
logger_->log_debug("parseProcessorNode: yield period => [{}]", procCfg.yieldPeriod);
}
if (auto runNode = procNode[schema_.runduration_nanos]) {
procCfg.runDurationNanos = runNode.getIntegerAsString().value();
logger_->log_debug("parseProcessorNode: run duration nanos => [{}]", procCfg.runDurationNanos);
}
// handle auto-terminated relationships
if (Node autoTerminatedSequence = procNode[schema_.autoterminated_rels]) {
std::vector<std::string> rawAutoTerminatedRelationshipValues;
if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) {
for (const auto& autoTerminatedRel : autoTerminatedSequence) {
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel.getString().value());
}
}
procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
}
// handle processor properties
if (Node propertiesNode = procNode[schema_.processor_properties]) {
parsePropertiesNode(propertiesNode, *processor, procCfg.name, parentGroup->getParameterContext());
}
// Take care of scheduling
if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) {
logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [{}]", scheduling_period);
processor->setSchedulingPeriod(*scheduling_period);
}
} else {
processor->setCronPeriod(procCfg.schedulingPeriod);
}
if (auto penalization_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.penalizationPeriod)) {
logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [{}]", penalization_period);
processor->setPenalizationPeriod(penalization_period.value());
}
if (auto yield_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.yieldPeriod)) {
logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [{}]", yield_period);
processor->setYieldPeriodMsec(yield_period.value());
}
// Default to running
processor->setScheduledState(core::RUNNING);
if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
processor->setSchedulingStrategy(core::TIMER_DRIVEN);
logger_->log_debug("setting scheduling strategy as {}", procCfg.schedulingStrategy);
} else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
processor->setSchedulingStrategy(core::EVENT_DRIVEN);
logger_->log_debug("setting scheduling strategy as {}", procCfg.schedulingStrategy);
} else {
processor->setSchedulingStrategy(core::CRON_DRIVEN);
logger_->log_debug("setting scheduling strategy as {}", procCfg.schedulingStrategy);
}
if (auto max_concurrent_tasks = parsing::parseIntegral<uint8_t>(procCfg.maxConcurrentTasks)) {
logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [{}]", *max_concurrent_tasks);
processor->setMaxConcurrentTasks(*max_concurrent_tasks);
}
if (auto run_duration_nanos = parsing::parseIntegral<uint64_t>(procCfg.runDurationNanos)) {
logger_->log_debug("parseProcessorNode: runDurationNanos => [{}]", *run_duration_nanos);
processor->setRunDurationNano(std::chrono::nanoseconds(*run_duration_nanos));
}
std::vector<core::Relationship> autoTerminatedRelationships;
for (auto &&relString : procCfg.autoTerminatedRelationships) {
core::Relationship relationship(relString, "");
logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [{}]", relString);
autoTerminatedRelationships.push_back(relationship);
}
processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
parentGroup->addProcessor(std::move(processor));
}
}