public Set getComponentsAffectedByFlowUpdate()

in nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java [5699:5901]


    public Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(final String processGroupId, final RegisteredFlowSnapshot updatedSnapshot) {
        final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);

        final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
        final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), controllerFacade.getFlowManager(), true);

        final ComparableDataFlow localFlow = new StandardComparableDataFlow("Current Flow", localContents);
        final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", updatedSnapshot.getFlowContents());

        final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
        final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor(),
            Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP);
        final FlowComparison comparison = flowComparator.compare();

        final FlowManager flowManager = controllerFacade.getFlowManager();
        final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
            .filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
            .filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
            .filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
            .filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, proposedFlow.getContents(), flowManager))
            .filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
            .filter(diff -> !FlowDifferenceFilters.isLocalScheduleStateChange(diff))
            .filter(diff -> !FlowDifferenceFilters.isPropertyMissingFromGhostComponent(diff, flowManager))
            .filter(difference -> difference.getDifferenceType() != DifferenceType.POSITION_CHANGED)
            .map(difference -> {
                final VersionedComponent localComponent = difference.getComponentA();

                final String state;
                final ProcessGroup localGroup;
                switch (localComponent.getComponentType()) {
                    case CONTROLLER_SERVICE:
                        final String serviceId = localComponent.getInstanceIdentifier();
                        final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
                        localGroup = serviceNode.getProcessGroup();
                        state = serviceNode.getState().name();
                        break;
                    case PROCESSOR:
                        final String processorId = localComponent.getInstanceIdentifier();
                        final ProcessorNode procNode = processorDAO.getProcessor(processorId);
                        localGroup = procNode.getProcessGroup();
                        state = procNode.getPhysicalScheduledState().name();
                        break;
                    case REMOTE_INPUT_PORT:
                        final InstantiatedVersionedRemoteGroupPort remoteInputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
                        final RemoteProcessGroup inputPortRpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteInputPort.getInstanceGroupId());
                        localGroup = inputPortRpg.getProcessGroup();
                        state = inputPortRpg.getInputPort(remoteInputPort.getInstanceIdentifier()).getScheduledState().name();
                        break;
                    case REMOTE_OUTPUT_PORT:
                        final InstantiatedVersionedRemoteGroupPort remoteOutputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
                        final RemoteProcessGroup outputPortRpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteOutputPort.getInstanceGroupId());
                        localGroup = outputPortRpg.getProcessGroup();
                        state = outputPortRpg.getOutputPort(remoteOutputPort.getInstanceIdentifier()).getScheduledState().name();
                        break;
                    case INPUT_PORT:
                        final InstantiatedVersionedPort versionedInputPort = (InstantiatedVersionedPort) localComponent;
                        final Port inputPort = getInputPort(versionedInputPort);
                        if (inputPort == null) {
                            localGroup = null;
                            state = null;
                        } else {
                            localGroup = inputPort.getProcessGroup();
                            state = inputPort.getScheduledState().name();
                        }
                        break;
                    case OUTPUT_PORT:
                        final InstantiatedVersionedPort versionedOutputPort = (InstantiatedVersionedPort) localComponent;
                        final Port outputPort = getOutputPort(versionedOutputPort);
                        if (outputPort == null) {
                            localGroup = null;
                            state = null;
                        } else {
                            localGroup = outputPort.getProcessGroup();
                            state = outputPort.getScheduledState().name();
                        }
                        break;
                    default:
                        state = null;
                        localGroup = null;
                        break;
                }

                if (localGroup != null && localGroup.resolveExecutionEngine() == ExecutionEngine.STATELESS) {
                    return createStatelessGroupAffectedComponentEntity(localGroup);
                }

                return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state);
            })
            .collect(Collectors.toCollection(HashSet::new));

        for (final FlowDifference difference : comparison.getDifferences()) {
            // Ignore differences for adding remote ports
            if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
                continue;
            }

            // Ignore name changes to public ports
            if (FlowDifferenceFilters.isPublicPortNameChange(difference)) {
                continue;
            }

            if (FlowDifferenceFilters.isNewPropertyWithDefaultValue(difference, controllerFacade.getFlowManager())) {
                continue;
            }

            if (FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(difference, updatedSnapshot.getFlowContents(), controllerFacade.getFlowManager())) {
                continue;
            }

            final VersionedComponent localComponent = difference.getComponentA();
            if (localComponent == null) {
                continue;
            }

            // If any Process Group is removed, consider all components below that Process Group as an affected component
            if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.flow.ComponentType.PROCESS_GROUP) {
                final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceIdentifier();
                final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId);

                localGroup.findAllProcessors().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllFunnels().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllInputPorts().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllOutputPorts().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllRemoteProcessGroups().stream()
                    .flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
                localGroup.findAllControllerServices().stream()
                    .map(comp -> createAffectedComponentEntity(comp))
                    .forEach(affectedComponents::add);
            }

            if (localComponent.getComponentType() == org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE) {
                final String serviceId = localComponent.getInstanceIdentifier();
                final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);

                final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
                for (final ControllerServiceNode referencingService : referencingServices) {
                    affectedComponents.add(createAffectedComponentEntity(referencingService));
                }

                final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
                for (final ProcessorNode referencingProcessor : referencingProcessors) {
                    affectedComponents.add(createAffectedComponentEntity(referencingProcessor));
                }
            }
        }

        // Create a map of all connectable components by versioned component ID to the connectable component itself
        final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>();
        mapToConnectableId(group.findAllFunnels(), connectablesByVersionId);
        mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId);
        mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId);
        mapToConnectableId(group.findAllProcessors(), connectablesByVersionId);

        final List<RemoteGroupPort> remotePorts = new ArrayList<>();
        for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) {
            remotePorts.addAll(rpg.getInputPorts());
            remotePorts.addAll(rpg.getOutputPorts());
        }
        mapToConnectableId(remotePorts, connectablesByVersionId);

        // If any connection is added or modified, we need to stop both the source (if it exists in the flow currently)
        // and the destination (if it exists in the flow currently).
        for (final FlowDifference difference : comparison.getDifferences()) {
            VersionedComponent component = difference.getComponentA();
            if (component == null) {
                component = difference.getComponentB();
            }

            if (component.getComponentType() != org.apache.nifi.flow.ComponentType.CONNECTION) {
                continue;
            }

            final VersionedConnection connection = (VersionedConnection) component;

            final String sourceVersionedId = connection.getSource().getId();
            final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId);
            if (sources != null) {
                for (final Connectable source : sources) {
                    affectedComponents.add(createAffectedComponentEntity(source));
                }
            }

            final String destinationVersionId = connection.getDestination().getId();
            final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId);
            if (destinations != null) {
                for (final Connectable destination : destinations) {
                    affectedComponents.add(createAffectedComponentEntity(destination));
                }
            }
        }

        return affectedComponents;
    }