public void instantiate()

in nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSnippet.java [225:647]


    public void instantiate(final FlowManager flowManager, final FlowController flowController, final ProcessGroup group, final boolean topLevel) {
        //
        // Instantiate Controller Services
        //
        final List<ControllerServiceNode> serviceNodes = new ArrayList<>();
        try {
            for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
                final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, controllerServiceDTO.getType(), controllerServiceDTO.getBundle());
                final ControllerServiceNode serviceNode = flowManager.createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(),
                    bundleCoordinate, Collections.emptySet(), true, true, null);

                serviceNode.pauseValidationTrigger();
                serviceNodes.add(serviceNode);

                serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
                serviceNode.setComments(controllerServiceDTO.getComments());
                serviceNode.setName(controllerServiceDTO.getName());

                if (controllerServiceDTO.getBulletinLevel() != null) {
                    serviceNode.setBulletinLevel(LogLevel.valueOf(controllerServiceDTO.getBulletinLevel()));
                } else {
                    // this situation exists for backward compatibility with nifi 1.16 and earlier where controller services do not have bulletinLevels set in flow.xml/flow.json
                    // and bulletinLevels are at the WARN level by default
                    serviceNode.setBulletinLevel(LogLevel.WARN);
                }

                if (!topLevel) {
                    serviceNode.setVersionedComponentId(controllerServiceDTO.getVersionedComponentId());
                }

                group.addControllerService(serviceNode);
            }

            // configure controller services. We do this after creating all of them in case 1 service
            // references another service.
            for (final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices()) {
                final String serviceId = controllerServiceDTO.getId();
                final ControllerServiceNode serviceNode = flowManager.getControllerServiceNode(serviceId);
                final Set<String> sensitiveDynamicPropertyNames = controllerServiceDTO.getSensitiveDynamicPropertyNames();
                serviceNode.setProperties(controllerServiceDTO.getProperties(), false, sensitiveDynamicPropertyNames == null ? Collections.emptySet() : sensitiveDynamicPropertyNames);
            }
        } finally {
            serviceNodes.forEach(ControllerServiceNode::resumeValidationTrigger);
        }

        //
        // Instantiate the labels
        //
        for (final LabelDTO labelDTO : dto.getLabels()) {
            final Label label = flowManager.createLabel(labelDTO.getId(), labelDTO.getLabel());
            label.setPosition(toPosition(labelDTO.getPosition()));
            if (labelDTO.getWidth() != null && labelDTO.getHeight() != null) {
                label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
            }

            label.setStyle(labelDTO.getStyle());
            if (labelDTO.getzIndex() != null) {
                label.setZIndex(label.getZIndex());
            }

            if (!topLevel) {
                label.setVersionedComponentId(labelDTO.getVersionedComponentId());
            }

            group.addLabel(label);
        }

        // Instantiate the funnels
        for (final FunnelDTO funnelDTO : dto.getFunnels()) {
            final Funnel funnel = flowManager.createFunnel(funnelDTO.getId());
            funnel.setPosition(toPosition(funnelDTO.getPosition()));
            if (!topLevel) {
                funnel.setVersionedComponentId(funnelDTO.getVersionedComponentId());
            }

            group.addFunnel(funnel);
        }

        //
        // Instantiate Input Ports & Output Ports
        //
        for (final PortDTO portDTO : dto.getInputPorts()) {
            final Port inputPort;
            if (group.isRootGroup() || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
                final String portName = generatePublicInputPortName(flowManager, portDTO.getName());
                inputPort = flowManager.createPublicInputPort(portDTO.getId(), portName);
            } else {
                inputPort = flowManager.createLocalInputPort(portDTO.getId(), portDTO.getName());
            }

            if (!topLevel) {
                inputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
            }
            inputPort.setPosition(toPosition(portDTO.getPosition()));
            inputPort.setProcessGroup(group);
            inputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
            inputPort.setComments(portDTO.getComments());
            if (portDTO.getState().equals(ScheduledState.DISABLED.toString())) {
                inputPort.disable();
            }
            group.addInputPort(inputPort);
        }

        for (final PortDTO portDTO : dto.getOutputPorts()) {
            final Port outputPort;
            if (group.isRootGroup() || Boolean.TRUE.equals(portDTO.getAllowRemoteAccess())) {
                final String portName = generatePublicOutputPortName(flowManager, portDTO.getName());
                outputPort = flowManager.createPublicOutputPort(portDTO.getId(), portName);
            } else {
                outputPort = flowManager.createLocalOutputPort(portDTO.getId(), portDTO.getName());
            }

            if (!topLevel) {
                outputPort.setVersionedComponentId(portDTO.getVersionedComponentId());
            }
            outputPort.setPosition(toPosition(portDTO.getPosition()));
            outputPort.setProcessGroup(group);
            outputPort.setMaxConcurrentTasks(portDTO.getConcurrentlySchedulableTaskCount());
            outputPort.setComments(portDTO.getComments());
            if (portDTO.getState().equals(ScheduledState.DISABLED.toString())) {
                outputPort.disable();
            }
            group.addOutputPort(outputPort);
        }

        //
        // Instantiate the processors
        //
        for (final ProcessorDTO processorDTO : dto.getProcessors()) {
            final BundleCoordinate bundleCoordinate = BundleUtils.getBundle(extensionManager, processorDTO.getType(), processorDTO.getBundle());
            final ProcessorNode procNode = flowManager.createProcessor(processorDTO.getType(), processorDTO.getId(), bundleCoordinate);
            procNode.pauseValidationTrigger();

            try {
                procNode.setPosition(toPosition(processorDTO.getPosition()));
                procNode.setProcessGroup(group);
                if (!topLevel) {
                    procNode.setVersionedComponentId(processorDTO.getVersionedComponentId());
                }

                final ProcessorConfigDTO config = processorDTO.getConfig();
                procNode.setComments(config.getComments());
                if (config.isLossTolerant() != null) {
                    procNode.setLossTolerant(config.isLossTolerant());
                }
                procNode.setName(processorDTO.getName());

                procNode.setYieldPeriod(config.getYieldDuration());
                procNode.setPenalizationPeriod(config.getPenaltyDuration());
                procNode.setBulletinLevel(LogLevel.valueOf(config.getBulletinLevel()));
                procNode.setAnnotationData(config.getAnnotationData());
                procNode.setRetryCount(config.getRetryCount());
                procNode.setRetriedRelationships(config.getRetriedRelationships());
                if (config.getBackoffMechanism() != null) {
                    procNode.setBackoffMechanism(BackoffMechanism.valueOf(config.getBackoffMechanism()));
                }
                procNode.setMaxBackoffPeriod(config.getMaxBackoffPeriod());
                procNode.setStyle(processorDTO.getStyle());

                if (config.getRunDurationMillis() != null) {
                    procNode.setRunDuration(config.getRunDurationMillis(), TimeUnit.MILLISECONDS);
                }

                if (config.getSchedulingStrategy() != null) {
                    procNode.setSchedulingStrategy(SchedulingStrategy.valueOf(config.getSchedulingStrategy()));
                }

                if (config.getExecutionNode() != null) {
                    procNode.setExecutionNode(ExecutionNode.valueOf(config.getExecutionNode()));
                }

                if (processorDTO.getState().equals(ScheduledState.DISABLED.toString())) {
                    procNode.disable();
                }

                // ensure that the scheduling strategy is set prior to these values
                procNode.setMaxConcurrentTasks(config.getConcurrentlySchedulableTaskCount());
                procNode.setSchedulingPeriod(config.getSchedulingPeriod());

                final Set<Relationship> relationships = new HashSet<>();
                if (processorDTO.getRelationships() != null) {
                    for (final RelationshipDTO rel : processorDTO.getRelationships()) {
                        if (rel.isAutoTerminate()) {
                            relationships.add(procNode.getRelationship(rel.getName()));
                        }
                    }
                    procNode.setAutoTerminatedRelationships(relationships);
                }

                // We need to add the processor to the ProcessGroup before calling ProcessorNode.setProperties. This will notify the FlowManager that the Processor
                // has been added to the flow, which is important before calling ProcessorNode.setProperties, since #setProperties may call methods that result in looking
                // up a Controller Service (such as #getClassloaderIsolationKey). The Processor must be registered with the FlowManager and its parent Process Group
                // before that can happen, in order to ensure that it has access to any referenced Controller Service.
                group.addProcessor(procNode);

                if (config.getProperties() != null) {
                    final Set<String> sensitiveDynamicPropertyNames = config.getSensitiveDynamicPropertyNames();
                    procNode.setProperties(config.getProperties(), false, sensitiveDynamicPropertyNames == null ? Collections.emptySet() : sensitiveDynamicPropertyNames);
                }

                // Notify the processor node that the configuration (properties, e.g.) has been restored
                final StandardProcessContext processContext = new StandardProcessContext(procNode, flowController.getControllerServiceProvider(),
                        flowController.getStateManagerProvider().getStateManager(procNode.getProcessor().getIdentifier()), () -> false, flowController);
                procNode.onConfigurationRestored(processContext);
            } finally {
                procNode.resumeValidationTrigger();
            }
        }

        //
        // Instantiate Remote Process Groups
        //
        for (final RemoteProcessGroupDTO remoteGroupDTO : dto.getRemoteProcessGroups()) {
            final RemoteProcessGroup remoteGroup = flowManager.createRemoteProcessGroup(remoteGroupDTO.getId(), remoteGroupDTO.getTargetUris());
            remoteGroup.setComments(remoteGroupDTO.getComments());
            remoteGroup.setPosition(toPosition(remoteGroupDTO.getPosition()));
            remoteGroup.setCommunicationsTimeout(remoteGroupDTO.getCommunicationsTimeout());
            remoteGroup.setYieldDuration(remoteGroupDTO.getYieldDuration());
            if (!topLevel) {
                remoteGroup.setVersionedComponentId(remoteGroupDTO.getVersionedComponentId());
            }

            if (remoteGroupDTO.getTransportProtocol() == null) {
                remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.RAW);
            } else {
                remoteGroup.setTransportProtocol(SiteToSiteTransportProtocol.valueOf(remoteGroupDTO.getTransportProtocol()));
            }

            remoteGroup.setProxyHost(remoteGroupDTO.getProxyHost());
            remoteGroup.setProxyPort(remoteGroupDTO.getProxyPort());
            remoteGroup.setProxyUser(remoteGroupDTO.getProxyUser());
            remoteGroup.setProxyPassword(remoteGroupDTO.getProxyPassword());
            remoteGroup.setProcessGroup(group);

            // set the input/output ports
            if (remoteGroupDTO.getContents() != null) {
                final RemoteProcessGroupContentsDTO contents = remoteGroupDTO.getContents();

                // ensure there are input ports
                if (contents.getInputPorts() != null) {
                    remoteGroup.setInputPorts(convertRemotePort(contents.getInputPorts()), false);
                }

                // ensure there are output ports
                if (contents.getOutputPorts() != null) {
                    remoteGroup.setOutputPorts(convertRemotePort(contents.getOutputPorts()), false);
                }
            }

            group.addRemoteProcessGroup(remoteGroup);
        }

        //
        // Instantiate ProcessGroups
        //
        for (final ProcessGroupDTO groupDTO : dto.getProcessGroups()) {
            final ProcessGroup childGroup = flowManager.createProcessGroup(groupDTO.getId());
            childGroup.setParent(group);
            childGroup.setPosition(toPosition(groupDTO.getPosition()));
            childGroup.setComments(groupDTO.getComments());
            childGroup.setName(groupDTO.getName());
            childGroup.setExecutionEngine(ExecutionEngine.valueOf(groupDTO.getExecutionEngine()));
            childGroup.setStatelessFlowTimeout(groupDTO.getStatelessFlowTimeout());
            childGroup.setMaxConcurrentTasks(groupDTO.getMaxConcurrentTasks());

            final String flowfileConcurrentName = groupDTO.getFlowfileConcurrency();
            if (flowfileConcurrentName != null) {
                childGroup.setFlowFileConcurrency(FlowFileConcurrency.valueOf(flowfileConcurrentName));
            }

            final String outboundPolicyName = groupDTO.getFlowfileOutboundPolicy();
            if (outboundPolicyName != null) {
                childGroup.setFlowFileOutboundPolicy(FlowFileOutboundPolicy.valueOf(outboundPolicyName));
            }

            final ParameterContextReferenceEntity parameterContextReference = groupDTO.getParameterContext();
            if (parameterContextReference != null) {
                final ParameterContext parameterContext = flowManager.getParameterContextManager().getParameterContext(parameterContextReference.getId());
                if (parameterContext != null) {
                    childGroup.setParameterContext(parameterContext);
                }
            }

            final String defaultFlowFileExpiration = groupDTO.getDefaultFlowFileExpiration();
            if (defaultFlowFileExpiration != null) {
                childGroup.setDefaultFlowFileExpiration(defaultFlowFileExpiration);
            }

            final Long defaultBackPressureObjectThreshold = groupDTO.getDefaultBackPressureObjectThreshold();
            if (defaultBackPressureObjectThreshold != null) {
                childGroup.setDefaultBackPressureObjectThreshold(defaultBackPressureObjectThreshold);
            }

            final String defaultBackPressureDataSizeThreshold = groupDTO.getDefaultBackPressureDataSizeThreshold();
            if (defaultBackPressureDataSizeThreshold != null) {
                childGroup.setDefaultBackPressureDataSizeThreshold(defaultBackPressureDataSizeThreshold);
            }

            final String logFileSuffix = groupDTO.getLogFileSuffix();
            if (logFileSuffix != null) {
                childGroup.setLogFileSuffix(logFileSuffix);
            }

            // If this Process Group is 'top level' then we do not set versioned component ID's.
            // We do this only if this component is the child of a Versioned Component.
            if (!topLevel) {
                childGroup.setVersionedComponentId(groupDTO.getVersionedComponentId());
            }

            group.addProcessGroup(childGroup);

            final FlowSnippetDTO contents = groupDTO.getContents();

            // we want this to be recursive, so we will create a new template that contains only
            // the contents of this child group and recursively call ourselves.
            final FlowSnippetDTO childTemplateDTO = new FlowSnippetDTO();
            childTemplateDTO.setConnections(contents.getConnections());
            childTemplateDTO.setInputPorts(contents.getInputPorts());
            childTemplateDTO.setLabels(contents.getLabels());
            childTemplateDTO.setOutputPorts(contents.getOutputPorts());
            childTemplateDTO.setProcessGroups(contents.getProcessGroups());
            childTemplateDTO.setProcessors(contents.getProcessors());
            childTemplateDTO.setFunnels(contents.getFunnels());
            childTemplateDTO.setRemoteProcessGroups(contents.getRemoteProcessGroups());
            childTemplateDTO.setControllerServices(contents.getControllerServices());

            final StandardFlowSnippet childSnippet = new StandardFlowSnippet(childTemplateDTO, extensionManager);
            childSnippet.instantiate(flowManager, flowController, childGroup, false);

            if (groupDTO.getVersionControlInformation() != null) {
                final VersionControlInformation vci = StandardVersionControlInformation.Builder
                    .fromDto(groupDTO.getVersionControlInformation())
                    .build();
                childGroup.setVersionControlInformation(vci, Collections.emptyMap());
            }
        }

        //
        // Instantiate Connections
        //
        for (final ConnectionDTO connectionDTO : dto.getConnections()) {
            final ConnectableDTO sourceDTO = connectionDTO.getSource();
            final ConnectableDTO destinationDTO = connectionDTO.getDestination();
            final Connectable source;
            final Connectable destination;

            // Locate the source and destination connectable. If this is a remote port we need to locate the remote process groups. Otherwise, we need to
            // find the connectable given its parent group.
            //
            // NOTE: (getConnectable returns ANY connectable, when the parent is not this group only input ports or output ports should be returned. If something
            // other than a port is returned, an exception will be thrown when adding the connection below.)

            // See if the source connectable is a remote port
            if (ConnectableType.REMOTE_OUTPUT_PORT.name().equals(sourceDTO.getType())) {
                final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(sourceDTO.getGroupId());
                source = remoteGroup.getOutputPort(sourceDTO.getId());
            } else {
                final ProcessGroup sourceGroup = getConnectableParent(group, sourceDTO.getGroupId(), flowManager);
                source = sourceGroup.getConnectable(sourceDTO.getId());
            }

            // see if the destination connectable is a remote port
            if (ConnectableType.REMOTE_INPUT_PORT.name().equals(destinationDTO.getType())) {
                final RemoteProcessGroup remoteGroup = group.getRemoteProcessGroup(destinationDTO.getGroupId());
                destination = remoteGroup.getInputPort(destinationDTO.getId());
            } else {
                final ProcessGroup destinationGroup = getConnectableParent(group, destinationDTO.getGroupId(), flowManager);
                destination = destinationGroup.getConnectable(destinationDTO.getId());
            }

            // determine the selection relationships for this connection
            final Set<String> relationships = new HashSet<>();
            if (connectionDTO.getSelectedRelationships() != null) {
                relationships.addAll(connectionDTO.getSelectedRelationships());
            }

            final Connection connection = flowManager.createConnection(connectionDTO.getId(), connectionDTO.getName(), source, destination, relationships);
            if (!topLevel) {
                connection.setVersionedComponentId(connectionDTO.getVersionedComponentId());
            }

            if (connectionDTO.getzIndex() != null) {
                connection.setZIndex(connection.getZIndex());
            }

            if (connectionDTO.getBends() != null) {
                final List<Position> bendPoints = new ArrayList<>();
                for (final PositionDTO bend : connectionDTO.getBends()) {
                    bendPoints.add(new Position(bend.getX(), bend.getY()));
                }
                connection.setBendPoints(bendPoints);
            }

            final FlowFileQueue queue = connection.getFlowFileQueue();
            queue.setBackPressureDataSizeThreshold(connectionDTO.getBackPressureDataSizeThreshold());
            queue.setBackPressureObjectThreshold(connectionDTO.getBackPressureObjectThreshold());
            queue.setFlowFileExpiration(connectionDTO.getFlowFileExpiration());

            final List<String> prioritizers = connectionDTO.getPrioritizers();
            if (prioritizers != null) {
                final List<String> newPrioritizersClasses = new ArrayList<>(prioritizers);
                final List<FlowFilePrioritizer> newPrioritizers = new ArrayList<>();
                for (final String className : newPrioritizersClasses) {
                    try {
                        newPrioritizers.add(flowManager.createPrioritizer(className));
                    } catch (final ClassNotFoundException | InstantiationException | IllegalAccessException e) {
                        throw new IllegalArgumentException("Unable to set prioritizer " + className + ": " + e);
                    }
                }
                queue.setPriorities(newPrioritizers);
            }

            final String loadBalanceStrategyName = connectionDTO.getLoadBalanceStrategy();
            if (loadBalanceStrategyName != null) {
                final LoadBalanceStrategy loadBalanceStrategy = LoadBalanceStrategy.valueOf(loadBalanceStrategyName);
                final String partitioningAttribute = connectionDTO.getLoadBalancePartitionAttribute();
                queue.setLoadBalanceStrategy(loadBalanceStrategy, partitioningAttribute);
            }

            connection.setProcessGroup(group);
            group.addConnection(connection);
        }
    }