in libminifi/src/core/ProcessGroup.cpp [390:450]
void ProcessGroup::addConnection(std::unique_ptr<Connection> connection) {
std::lock_guard<std::recursive_mutex> lock(mutex_);
auto [insertPos, inserted] = connections_.insert(std::move(connection));
if (!inserted) {
return;
}
auto& insertedConnection = *insertPos;
logger_->log_debug("Add connection {} into process group {}", insertedConnection->getName(), name_);
// only allow connections between processors of the same process group or in/output ports of child process groups
// check input and output ports connection restrictions inside and outside a process group
Processor* source = findPortById(insertedConnection->getSourceUUID());
if (source && dynamic_cast<Port*>(source)->getPortType() == PortType::OUTPUT) {
logger_->log_error("Output port [id = '{}'] cannot be a source inside the process group in the connection [name = '{}', id = '{}']",
insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
source = nullptr;
} else if (!source) {
source = findChildPortById(insertedConnection->getSourceUUID());
if (source && dynamic_cast<Port*>(source)->getPortType() == PortType::INPUT) {
logger_->log_error("Input port [id = '{}'] cannot be a source outside the process group in the connection [name = '{}', id = '{}']",
insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
source = nullptr;
} else if (!source) {
source = findProcessorById(insertedConnection->getSourceUUID(), Traverse::ExcludeChildren);
if (!source) {
logger_->log_error("Cannot find the source processor with id '{}' for the connection [name = '{}', id = '{}']",
insertedConnection->getSourceUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
}
}
}
if (source) {
source->addConnection(insertedConnection.get());
}
Processor* destination = findPortById(insertedConnection->getDestinationUUID());
if (destination && dynamic_cast<Port*>(destination)->getPortType() == PortType::INPUT) {
logger_->log_error("Input port [id = '{}'] cannot be a destination inside the process group in the connection [name = '{}', id = '{}']",
insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
destination = nullptr;
} else if (!destination) {
destination = findChildPortById(insertedConnection->getDestinationUUID());
if (destination && dynamic_cast<Port*>(destination)->getPortType() == PortType::OUTPUT) {
logger_->log_error("Output port [id = '{}'] cannot be a destination outside the process group in the connection [name = '{}', id = '{}']",
insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
destination = nullptr;
} else if (!destination) {
destination = findProcessorById(insertedConnection->getDestinationUUID(), Traverse::ExcludeChildren);
if (!destination) {
logger_->log_error("Cannot find the destination processor with id '{}' for the connection [name = '{}', id = '{}']",
insertedConnection->getDestinationUUID().to_string(), insertedConnection->getName(), insertedConnection->getUUIDStr());
}
}
}
if (destination && destination != source) {
destination->addConnection(insertedConnection.get());
}
}