in nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java [225:647]
public void instantiate(final FlowManager flowManager, final FlowController flowController, final ProcessGroup group, final boolean topLevel) {
//
// Instantiate Controller Services
//
final List<ControllerServiceNode> serviceNodes = new ArrayList<>();
try {
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
final ControllerServiceNode serviceNode = flowManager.createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(),
bundleCoordinate, Collections.emptySet(), true, true, null);
serviceNode.pauseValidationTrigger();
serviceNodes.add(serviceNode);
serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
serviceNode.setComments(controllerServiceDTO.getComments());
serviceNode.setName(controllerServiceDTO.getName());
if (controllerServiceDTO.getBulletinLevel() != null) {
serviceNode.setBulletinLevel(LogLevel.valueOf(controllerServiceDTO.getBulletinLevel()));
} else {
// this situation exists for backward compatibility with nifi 1.16 and earlier where controller services do not have bulletinLevels set in flow.xml/flow.json
// and bulletinLevels are at the WARN level by default
serviceNode.setBulletinLevel(LogLevel.WARN);
}
if (!topLevel) {
serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
}
group.addControllerService(serviceNode);
}
// configure controller services. We do this after creating all of them in case 1 service
// references another service.
for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
final String serviceId = controllerServiceDTO.getId();
final ControllerServiceNode serviceNode = flowManager.getControllerServiceNode(serviceId);
final Set<String> sensitiveDynamicPropertyNames = controllerServiceDTO.getSensitiveDynamicPropertyNames();
serviceNode.setProperties(controllerServiceDTO.getProperties(), false, sensitiveDynamicPropertyNames == null ? Collections.emptySet() : sensitiveDynamicPropertyNames);
}
} finally {
serviceNodes.forEach(ControllerServiceNode::resumeValidationTrigger);
}
//
// Instantiate the labels
//
for (final LabelDTO labelDTO : dto.getLabels()) {
final Label label = flowManager.createLabel(labelDTO.getId(), labelDTO.getLabel());
label.setPosition(toPosition(labelDTO.getPosition()));
if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
}
label.setStyle(labelDTO.getStyle());
if (labelDTO.getzIndex() != null) {
label.setZIndex(label.getZIndex());
}
if (!topLevel) {
label.setVersionedComponentId(labelDTO.getVersionedComponentId());
}
group.addLabel(label);
}
// Instantiate the funnels
for (final FunnelDTO funnelDTO : dto.getFunnels()) {
final Funnel funnel = flowManager.createFunnel(funnelDTO.getId());
funnel.setPosition(toPosition(funnelDTO.getPosition()));
if (!topLevel) {
funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId());
}
group.addFunnel(funnel);
}
//
// Instantiate Input Ports & Output Ports
//
for (final PortDTO portDTO : dto.getInputPorts()) {
final Port inputPort;
if (group.isRootGroup() || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
final String portName = generatePublicInputPortName(flowManager, portDTO.getName());
inputPort = flowManager.createPublicInputPort(portDTO.getId(), portName);
} else {
inputPort = flowManager.createLocalInputPort(portDTO.getId(), portDTO.getName());
}
if (!topLevel) {
inputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
}
inputPort.setPosition(toPosition(portDTO.getPosition()));
inputPort.setProcessGroup(group);
inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
inputPort.setComments(portDTO.getComments());
if (portDTO.getState().equals(ScheduledState.DISABLED.toString())) {
inputPort.disable();
}
group.addInputPort(inputPort);
}
for (final PortDTO portDTO : dto.getOutputPorts()) {
final Port outputPort;
if (group.isRootGroup() || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
final String portName = generatePublicOutputPortName(flowManager, portDTO.getName());
outputPort = flowManager.createPublicOutputPort(portDTO.getId(), portName);
} else {
outputPort = flowManager.createLocalOutputPort(portDTO.getId(), portDTO.getName());
}
if (!topLevel) {
outputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
}
outputPort.setPosition(toPosition(portDTO.getPosition()));
outputPort.setProcessGroup(group);
outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
outputPort.setComments(portDTO.getComments());
if (portDTO.getState().equals(ScheduledState.DISABLED.toString())) {
outputPort.disable();
}
group.addOutputPort(outputPort);
}
//
// Instantiate the processors
//
for (final ProcessorDTO processorDTO : dto.getProcessors()) {
final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, processorDTO.getType(), processorDTO.getBundle());
final ProcessorNode procNode = flowManager.createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
procNode.pauseValidationTrigger();
try {
procNode.setPosition(toPosition(processorDTO.getPosition()));
procNode.setProcessGroup(group);
if (!topLevel) {
procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
}
final ProcessorConfigDTO config = processorDTO.getConfig();
procNode.setComments(config.getComments());
if (config.isLossTolerant() != null) {
procNode.setLossTolerant(config.isLossTolerant());
}
procNode.setName(processorDTO.getName());
procNode.setYieldPeriod(config.getYieldDuration());
procNode.setPenalizationPeriod(config.getPenaltyDuration());
procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
procNode.setAnnotationData(config.getAnnotationData());
procNode.setRetryCount(config.getRetryCount());
procNode.setRetriedRelationships(config.getRetriedRelationships());
if (config.getBackoffMechanism() != null) {
procNode.setBackoffMechanism(BackoffMechanism.valueOf(config.getBackoffMechanism()));
}
procNode.setMaxBackoffPeriod(config.getMaxBackoffPeriod());
procNode.setStyle(processorDTO.getStyle());
if (config.getRunDurationMillis() != null) {
procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
}
if (config.getSchedulingStrategy() != null) {
procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
}
if (config.getExecutionNode() != null) {
procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
}
if (processorDTO.getState().equals(ScheduledState.DISABLED.toString())) {
procNode.disable();
}
// ensure that the scheduling strategy is set prior to these values
procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
procNode.setSchedulingPeriod(config.getSchedulingPeriod());
final Set<Relationship> relationships = new HashSet<>();
if (processorDTO.getRelationships() != null) {
for (final RelationshipDTO rel : processorDTO.getRelationships()) {
if (rel.isAutoTerminate()) {
relationships.add(procNode.getRelationship(rel.getName()));
}
}
procNode.setAutoTerminatedRelationships(relationships);
}
// We need to add the processor to the ProcessGroup before calling ProcessorNode.setProperties. This will notify the FlowManager that the Processor
// has been added to the flow, which is important before calling ProcessorNode.setProperties, since #setProperties may call methods that result in looking
// up a Controller Service (such as #getClassloaderIsolationKey). The Processor must be registered with the FlowManager and its parent Process Group
// before that can happen, in order to ensure that it has access to any referenced Controller Service.
group.addProcessor(procNode);
if (config.getProperties() != null) {
final Set<String> sensitiveDynamicPropertyNames = config.getSensitiveDynamicPropertyNames();
procNode.setProperties(config.getProperties(), false, sensitiveDynamicPropertyNames == null ? Collections.emptySet() : sensitiveDynamicPropertyNames);
}
// Notify the processor node that the configuration (properties, e.g.) has been restored
final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()), () -> false, flowController);
procNode.onConfigurationRestored(processContext);
} finally {
procNode.resumeValidationTrigger();
}
}
//
// Instantiate Remote Process Groups
//
for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) {
final RemoteProcessGroup remoteGroup = flowManager.createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUris());
remoteGroup.setComments(remoteGroupDTO.getComments());
remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
if (!topLevel) {
remoteGroup.setVersionedComponentId(remoteGroupDTO.getVersionedComponentId());
}
if (remoteGroupDTO.getTransportProtocol() == null) {
remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.RAW);
} else {
remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(remoteGroupDTO.getTransportProtocol()));
}
remoteGroup.setProxyHost(remoteGroupDTO.getProxyHost());
remoteGroup.setProxyPort(remoteGroupDTO.getProxyPort());
remoteGroup.setProxyUser(remoteGroupDTO.getProxyUser());
remoteGroup.setProxyPassword(remoteGroupDTO.getProxyPassword());
remoteGroup.setProcessGroup(group);
// set the input/output ports
if (remoteGroupDTO.getContents() != null) {
final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents();
// ensure there are input ports
if (contents.getInputPorts() != null) {
remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false);
}
// ensure there are output ports
if (contents.getOutputPorts() != null) {
remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false);
}
}
group.addRemoteProcessGroup(remoteGroup);
}
//
// Instantiate ProcessGroups
//
for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) {
final ProcessGroup childGroup = flowManager.createProcessGroup(groupDTO.getId());
childGroup.setParent(group);
childGroup.setPosition(toPosition(groupDTO.getPosition()));
childGroup.setComments(groupDTO.getComments());
childGroup.setName(groupDTO.getName());
childGroup.setExecutionEngine(ExecutionEngine.valueOf(groupDTO.getExecutionEngine()));
childGroup.setStatelessFlowTimeout(groupDTO.getStatelessFlowTimeout());
childGroup.setMaxConcurrentTasks(groupDTO.getMaxConcurrentTasks());
final String flowfileConcurrentName = groupDTO.getFlowfileConcurrency();
if (flowfileConcurrentName != null) {
childGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrentName));
}
final String outboundPolicyName = groupDTO.getFlowfileOutboundPolicy();
if (outboundPolicyName != null) {
childGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(outboundPolicyName));
}
final ParameterContextReferenceEntity parameterContextReference = groupDTO.getParameterContext();
if (parameterContextReference != null) {
final ParameterContext parameterContext = flowManager.getParameterContextManager().getParameterContext(parameterContextReference.getId());
if (parameterContext != null) {
childGroup.setParameterContext(parameterContext);
}
}
final String defaultFlowFileExpiration = groupDTO.getDefaultFlowFileExpiration();
if (defaultFlowFileExpiration != null) {
childGroup.setDefaultFlowFileExpiration(defaultFlowFileExpiration);
}
final Long defaultBackPressureObjectThreshold = groupDTO.getDefaultBackPressureObjectThreshold();
if (defaultBackPressureObjectThreshold != null) {
childGroup.setDefaultBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
}
final String defaultBackPressureDataSizeThreshold = groupDTO.getDefaultBackPressureDataSizeThreshold();
if (defaultBackPressureDataSizeThreshold != null) {
childGroup.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
}
final String logFileSuffix = groupDTO.getLogFileSuffix();
if (logFileSuffix != null) {
childGroup.setLogFileSuffix(logFileSuffix);
}
// If this Process Group is 'top level' then we do not set versioned component ID's.
// We do this only if this component is the child of a Versioned Component.
if (!topLevel) {
childGroup.setVersionedComponentId(groupDTO.getVersionedComponentId());
}
group.addProcessGroup(childGroup);
final FlowSnippetDTO contents = groupDTO.getContents();
// we want this to be recursive, so we will create a new template that contains only
// the contents of this child group and recursively call ourselves.
final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO();
childTemplateDTO.setConnections(contents.getConnections());
childTemplateDTO.setInputPorts(contents.getInputPorts());
childTemplateDTO.setLabels(contents.getLabels());
childTemplateDTO.setOutputPorts(contents.getOutputPorts());
childTemplateDTO.setProcessGroups(contents.getProcessGroups());
childTemplateDTO.setProcessors(contents.getProcessors());
childTemplateDTO.setFunnels(contents.getFunnels());
childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
childTemplateDTO.setControllerServices(contents.getControllerServices());
final StandardFlowSnippet childSnippet = new StandardFlowSnippet(childTemplateDTO, extensionManager);
childSnippet.instantiate(flowManager, flowController, childGroup, false);
if (groupDTO.getVersionControlInformation() != null) {
final VersionControlInformation vci = StandardVersionControlInformation.Builder
.fromDto(groupDTO.getVersionControlInformation())
.build();
childGroup.setVersionControlInformation(vci, Collections.emptyMap());
}
}
//
// Instantiate Connections
//
for (final ConnectionDTO connectionDTO : dto.getConnections()) {
final ConnectableDTO sourceDTO = connectionDTO.getSource();
final ConnectableDTO destinationDTO = connectionDTO.getDestination();
final Connectable source;
final Connectable destination;
// Locate the source and destination connectable. If this is a remote port we need to locate the remote process groups. Otherwise, we need to
// find the connectable given its parent group.
//
// NOTE: (getConnectable returns ANY connectable, when the parent is not this group only input ports or output ports should be returned. If something
// other than a port is returned, an exception will be thrown when adding the connection below.)
// See if the source connectable is a remote port
if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) {
final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId());
source = remoteGroup.getOutputPort(sourceDTO.getId());
} else {
final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId(), flowManager);
source = sourceGroup.getConnectable(sourceDTO.getId());
}
// see if the destination connectable is a remote port
if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) {
final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId());
destination = remoteGroup.getInputPort(destinationDTO.getId());
} else {
final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId(), flowManager);
destination = destinationGroup.getConnectable(destinationDTO.getId());
}
// determine the selection relationships for this connection
final Set<String> relationships = new HashSet<>();
if (connectionDTO.getSelectedRelationships() != null) {
relationships.addAll(connectionDTO.getSelectedRelationships());
}
final Connection connection = flowManager.createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
if (!topLevel) {
connection.setVersionedComponentId(connectionDTO.getVersionedComponentId());
}
if (connectionDTO.getzIndex() != null) {
connection.setZIndex(connection.getZIndex());
}
if (connectionDTO.getBends() != null) {
final List<Position> bendPoints = new ArrayList<>();
for (final PositionDTO bend : connectionDTO.getBends()) {
bendPoints.add(new Position(bend.getX(), bend.getY()));
}
connection.setBendPoints(bendPoints);
}
final FlowFileQueue queue = connection.getFlowFileQueue();
queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration());
final List<String> prioritizers = connectionDTO.getPrioritizers();
if (prioritizers != null) {
final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers);
final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>();
for (final String className : newPrioritizersClasses) {
try {
newPrioritizers.add(flowManager.createPrioritizer(className));
} catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
}
}
queue.setPriorities(newPrioritizers);
}
final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy();
if (loadBalanceStrategyName != null) {
final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
final String partitioningAttribute = connectionDTO.getLoadBalancePartitionAttribute();
queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute);
}
connection.setProcessGroup(group);
group.addConnection(connection);
}
}