in libminifi/src/core/flow/StructuredConfiguration.cpp [151:302]
void StructuredConfiguration::parseProcessorNode(const Node& processors_node, core::ProcessGroup* parentGroup) {
int64_t runDurationNanos = -1;
utils::Identifier uuid;
std::unique_ptr<core::Processor> processor;
if (!parentGroup) {
logger_->log_error("parseProcessNode: no parent group exists");
return;
}
if (!processors_node) {
throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
if (!processors_node.isSequence()) {
throw std::invalid_argument(
"Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
}
// Evaluate sequence of processors
for (const auto& procNode : processors_node) {
core::ProcessorConfig procCfg;
checkRequiredField(procNode, schema_.name);
procCfg.name = procNode[schema_.name].getString().value();
procCfg.id = getOrGenerateId(procNode);
uuid = procCfg.id;
logger_->log_debug("parseProcessorNode: name => [%s] id => [%s]", procCfg.name, procCfg.id);
checkRequiredField(procNode, schema_.type);
procCfg.javaClass = procNode[schema_.type].getString().value();
logger_->log_debug("parseProcessorNode: class => [%s]", procCfg.javaClass);
// Determine the processor name only from the Java class
auto lastOfIdx = procCfg.javaClass.find_last_of('.');
if (lastOfIdx != std::string::npos) {
lastOfIdx++; // if a value is found, increment to move beyond the .
std::string processorName = procCfg.javaClass.substr(lastOfIdx);
processor = this->createProcessor(processorName, procCfg.javaClass, uuid);
} else {
// Allow unqualified class names for core processors
processor = this->createProcessor(procCfg.javaClass, uuid);
}
if (!processor) {
logger_->log_error("Could not create a processor %s with id %s", procCfg.name, procCfg.id);
throw std::invalid_argument("Could not create processor " + procCfg.name);
}
processor->setName(procCfg.name);
processor->setFlowIdentifier(flow_version_->getFlowIdentifier());
procCfg.schedulingStrategy = getOptionalField(procNode, schema_.scheduling_strategy, DEFAULT_SCHEDULING_STRATEGY);
logger_->log_debug("parseProcessorNode: scheduling strategy => [%s]", procCfg.schedulingStrategy);
procCfg.schedulingPeriod = getOptionalField(procNode, schema_.scheduling_period, DEFAULT_SCHEDULING_PERIOD_STR);
logger_->log_debug("parseProcessorNode: scheduling period => [%s]", procCfg.schedulingPeriod);
if (auto tasksNode = procNode[schema_.max_concurrent_tasks]) {
procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value();
logger_->log_debug("parseProcessorNode: max concurrent tasks => [%s]", procCfg.maxConcurrentTasks);
}
if (auto penalizationNode = procNode[schema_.penalization_period]) {
procCfg.penalizationPeriod = penalizationNode.getString().value();
logger_->log_debug("parseProcessorNode: penalization period => [%s]", procCfg.penalizationPeriod);
}
if (auto yieldNode = procNode[schema_.proc_yield_period]) {
procCfg.yieldPeriod = yieldNode.getString().value();
logger_->log_debug("parseProcessorNode: yield period => [%s]", procCfg.yieldPeriod);
}
if (auto runNode = procNode[schema_.runduration_nanos]) {
procCfg.runDurationNanos = runNode.getIntegerAsString().value();
logger_->log_debug("parseProcessorNode: run duration nanos => [%s]", procCfg.runDurationNanos);
}
// handle auto-terminated relationships
if (Node autoTerminatedSequence = procNode[schema_.autoterminated_rels]) {
std::vector<std::string> rawAutoTerminatedRelationshipValues;
if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) {
for (const auto& autoTerminatedRel : autoTerminatedSequence) {
rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel.getString().value());
}
}
procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
}
// handle processor properties
if (Node propertiesNode = procNode[schema_.processor_properties]) {
parsePropertiesNode(propertiesNode, *processor, procCfg.name);
}
// Take care of scheduling
if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) {
logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [%" PRId64 "] ns", scheduling_period->count());
processor->setSchedulingPeriod(*scheduling_period);
}
} else {
processor->setCronPeriod(procCfg.schedulingPeriod);
}
if (auto penalization_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.penalizationPeriod)) {
logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [%" PRId64 "] ms", penalization_period->count());
processor->setPenalizationPeriod(penalization_period.value());
}
if (auto yield_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.yieldPeriod)) {
logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [%" PRId64 "] ms", yield_period->count());
processor->setYieldPeriodMsec(yield_period.value());
}
// Default to running
processor->setScheduledState(core::RUNNING);
if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
processor->setSchedulingStrategy(core::TIMER_DRIVEN);
logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
} else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
processor->setSchedulingStrategy(core::EVENT_DRIVEN);
logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
} else {
processor->setSchedulingStrategy(core::CRON_DRIVEN);
logger_->log_debug("setting scheduling strategy as %s", procCfg.schedulingStrategy);
}
int32_t maxConcurrentTasks;
if (core::Property::StringToInt(procCfg.maxConcurrentTasks, maxConcurrentTasks)) {
logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [%d]", maxConcurrentTasks);
processor->setMaxConcurrentTasks((uint8_t) maxConcurrentTasks);
}
if (core::Property::StringToInt(procCfg.runDurationNanos, runDurationNanos)) {
logger_->log_debug("parseProcessorNode: runDurationNanos => [%d]", runDurationNanos);
processor->setRunDurationNano(std::chrono::nanoseconds(runDurationNanos));
}
std::vector<core::Relationship> autoTerminatedRelationships;
for (auto &&relString : procCfg.autoTerminatedRelationships) {
core::Relationship relationship(relString, "");
logger_->log_debug("parseProcessorNode: autoTerminatedRelationship => [%s]", relString);
autoTerminatedRelationships.push_back(relationship);
}
processor->setAutoTerminatedRelationships(autoTerminatedRelationships);
parentGroup->addProcessor(std::move(processor));
}
}