void StructuredConfiguration::parseProcessorNode()

in libminifi/src/core/flow/StructuredConfiguration.cpp [299:439]


void StructuredConfiguration::parseProcessorNode(const Node& processors_node, core::ProcessGroup* parentGroup) {
  utils::Identifier uuid;
  std::unique_ptr<core::Processor> processor;

  if (!parentGroup) {
    logger_->log_error("parseProcessNode: no parent group exists");
    return;
  }

  if (!processors_node) {
    throw std::invalid_argument("Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
  }
  if (!processors_node.isSequence()) {
    throw std::invalid_argument(
        "Cannot instantiate a MiNiFi instance without a defined Processors configuration node.");
  }
  // Evaluate sequence of processors
  for (const auto& procNode : processors_node) {
    core::ProcessorConfig procCfg;

    checkRequiredField(procNode, schema_.name);
    procCfg.name = procNode[schema_.name].getString().value();
    procCfg.id = getOrGenerateId(procNode);

    uuid = procCfg.id;
    logger_->log_debug("parseProcessorNode: name => [{}] id => [{}]", procCfg.name, procCfg.id);
    checkRequiredField(procNode, schema_.type);
    procCfg.javaClass = procNode[schema_.type].getString().value();
    logger_->log_debug("parseProcessorNode: class => [{}]", procCfg.javaClass);

    // Determine the processor name only from the Java class
    processor = createProcessor(utils::string::partAfterLastOccurrenceOf(procCfg.javaClass, '.'), procCfg.javaClass, uuid);
    if (!processor) {
      logger_->log_error("Could not create a processor {} with id {}", procCfg.name, procCfg.id);
      throw std::invalid_argument("Could not create processor " + procCfg.name);
    }

    processor->setName(procCfg.name);

    processor->setFlowIdentifier(flow_version_->getFlowIdentifier());

    procCfg.schedulingStrategy = getOptionalField(procNode, schema_.scheduling_strategy, DEFAULT_SCHEDULING_STRATEGY);
    logger_->log_debug("parseProcessorNode: scheduling strategy => [{}]", procCfg.schedulingStrategy);

    procCfg.schedulingPeriod = getOptionalField(procNode, schema_.scheduling_period, DEFAULT_SCHEDULING_PERIOD_STR);

    logger_->log_debug("parseProcessorNode: scheduling period => [{}]", procCfg.schedulingPeriod);

    if (auto tasksNode = procNode[schema_.max_concurrent_tasks]) {
      procCfg.maxConcurrentTasks = tasksNode.getIntegerAsString().value();
      logger_->log_debug("parseProcessorNode: max concurrent tasks => [{}]", procCfg.maxConcurrentTasks);
    }

    if (auto penalizationNode = procNode[schema_.penalization_period]) {
      procCfg.penalizationPeriod = penalizationNode.getString().value();
      logger_->log_debug("parseProcessorNode: penalization period => [{}]", procCfg.penalizationPeriod);
    }

    if (auto yieldNode = procNode[schema_.proc_yield_period]) {
      procCfg.yieldPeriod = yieldNode.getString().value();
      logger_->log_debug("parseProcessorNode: yield period => [{}]", procCfg.yieldPeriod);
    }

    if (auto runNode = procNode[schema_.runduration_nanos]) {
      procCfg.runDurationNanos = runNode.getIntegerAsString().value();
      logger_->log_debug("parseProcessorNode: run duration nanos => [{}]", procCfg.runDurationNanos);
    }

    // handle auto-terminated relationships
    if (Node autoTerminatedSequence = procNode[schema_.autoterminated_rels]) {
      std::vector<std::string> rawAutoTerminatedRelationshipValues;
      if (autoTerminatedSequence.isSequence() && autoTerminatedSequence.size() > 0) {
        for (const auto& autoTerminatedRel : autoTerminatedSequence) {
          rawAutoTerminatedRelationshipValues.push_back(autoTerminatedRel.getString().value());
        }
      }
      procCfg.autoTerminatedRelationships = rawAutoTerminatedRelationshipValues;
    }

    // handle processor properties
    if (Node propertiesNode = procNode[schema_.processor_properties]) {
      parsePropertiesNode(propertiesNode, *processor, procCfg.name, parentGroup->getParameterContext());
    }

    // Take care of scheduling

    if (procCfg.schedulingStrategy == "TIMER_DRIVEN" || procCfg.schedulingStrategy == "EVENT_DRIVEN") {
      if (auto scheduling_period = utils::timeutils::StringToDuration<std::chrono::nanoseconds>(procCfg.schedulingPeriod)) {
        logger_->log_debug("convert: parseProcessorNode: schedulingPeriod => [{}]", scheduling_period);
        processor->setSchedulingPeriod(*scheduling_period);
      }
    } else {
      processor->setCronPeriod(procCfg.schedulingPeriod);
    }

    if (auto penalization_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.penalizationPeriod)) {
      logger_->log_debug("convert: parseProcessorNode: penalizationPeriod => [{}]", penalization_period);
      processor->setPenalizationPeriod(penalization_period.value());
    }

    if (auto yield_period = utils::timeutils::StringToDuration<std::chrono::milliseconds>(procCfg.yieldPeriod)) {
      logger_->log_debug("convert: parseProcessorNode: yieldPeriod => [{}]", yield_period);
      processor->setYieldPeriodMsec(yield_period.value());
    }

    // Default to running
    processor->setScheduledState(core::RUNNING);

    if (procCfg.schedulingStrategy == "TIMER_DRIVEN") {
      processor->setSchedulingStrategy(core::TIMER_DRIVEN);
      logger_->log_debug("setting scheduling strategy as {}", procCfg.schedulingStrategy);
    } else if (procCfg.schedulingStrategy == "EVENT_DRIVEN") {
      processor->setSchedulingStrategy(core::EVENT_DRIVEN);
      logger_->log_debug("setting scheduling strategy as {}", procCfg.schedulingStrategy);
    } else {
      processor->setSchedulingStrategy(core::CRON_DRIVEN);
      logger_->log_debug("setting scheduling strategy as {}", procCfg.schedulingStrategy);
    }

    if (auto max_concurrent_tasks = parsing::parseIntegral<uint8_t>(procCfg.maxConcurrentTasks)) {
      logger_->log_debug("parseProcessorNode: maxConcurrentTasks => [{}]", *max_concurrent_tasks);
      processor->setMaxConcurrentTasks(*max_concurrent_tasks);
    }

    if (auto run_duration_nanos = parsing::parseIntegral<uint64_t>(procCfg.runDurationNanos)) {
      logger_->log_debug("parseProcessorNode: runDurationNanos => [{}]", *run_duration_nanos);
      processor->setRunDurationNano(std::chrono::nanoseconds(*run_duration_nanos));
    }

    std::vector<core::Relationship> autoTerminatedRelationships;
    for (auto &&relString : procCfg.autoTerminatedRelationships) {
      core::Relationship relationship(relString, "");
      logger_->log_debug("parseProcessorNode: autoTerminatedRelationship  => [{}]", relString);
      autoTerminatedRelationships.push_back(relationship);
    }

    processor->setAutoTerminatedRelationships(autoTerminatedRelationships);

    parentGroup->addProcessor(std::move(processor));
  }
}