void ProcessGroup::addConnection()

in libminifi/src/core/ProcessGroup.cpp [390:450]


void ProcessGroup::addConnection(std::unique_ptr<Connection> connection) {
  std::lock_guard<std::recursive_mutex> lock(mutex_);

  auto [insertPos, inserted] = connections_.insert(std::move(connection));
  if (!inserted) {
    return;
  }

  auto& insertedConnection = *insertPos;

  logger_->log_debug("Add connection {} into process group {}", insertedConnection->getName(), name_);
  // only allow connections between processors of the same process group or in/output ports of child process groups
  // check input and output ports connection restrictions inside and outside a process group
  Processor* source = findPortById(insertedConnection->getSourceUUID());
  if (source && dynamic_cast<Port*>(source)->getPortType() == PortType::OUTPUT) {
    logger_->log_error("Output port [id = '{}'] cannot be a source inside the process group in the connection [name = '{}', id = '{}']",
                       insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
    source = nullptr;
  } else if (!source) {
    source = findChildPortById(insertedConnection->getSourceUUID());
    if (source && dynamic_cast<Port*>(source)->getPortType() == PortType::INPUT) {
      logger_->log_error("Input port [id = '{}'] cannot be a source outside the process group in the connection [name = '{}', id = '{}']",
                          insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
      source = nullptr;
    } else if (!source) {
      source = findProcessorById(insertedConnection->getSourceUUID(), Traverse::ExcludeChildren);
      if (!source) {
        logger_->log_error("Cannot find the source processor with id '{}' for the connection [name = '{}', id = '{}']",
                          insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
      }
    }
  }

  if (source) {
    source->addConnection(insertedConnection.get());
  }

  Processor* destination = findPortById(insertedConnection->getDestinationUUID());
  if (destination && dynamic_cast<Port*>(destination)->getPortType() == PortType::INPUT) {
    logger_->log_error("Input port [id = '{}'] cannot be a destination inside the process group in the connection [name = '{}', id = '{}']",
                       insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
    destination = nullptr;
  } else if (!destination) {
    destination = findChildPortById(insertedConnection->getDestinationUUID());
    if (destination && dynamic_cast<Port*>(destination)->getPortType() == PortType::OUTPUT) {
      logger_->log_error("Output port [id = '{}'] cannot be a destination outside the process group in the connection [name = '{}', id = '{}']",
                          insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
      destination = nullptr;
    } else if (!destination) {
      destination = findProcessorById(insertedConnection->getDestinationUUID(), Traverse::ExcludeChildren);
      if (!destination) {
        logger_->log_error("Cannot find the destination processor with id '{}' for the connection [name = '{}', id = '{}']",
                          insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
      }
    }
  }

  if (destination && destination != source) {
    destination->addConnection(insertedConnection.get());
  }
}