in nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java [5699:5901]
public Set<AffectedComponentEntity> getComponentsAffectedByFlowUpdate(final String processGroupId, final RegisteredFlowSnapshot updatedSnapshot) {
final ProcessGroup group = processGroupDAO.getProcessGroup(processGroupId);
final NiFiRegistryFlowMapper mapper = makeNiFiRegistryFlowMapper(controllerFacade.getExtensionManager());
final VersionedProcessGroup localContents = mapper.mapProcessGroup(group, controllerFacade.getControllerServiceProvider(), controllerFacade.getFlowManager(), true);
final ComparableDataFlow localFlow = new StandardComparableDataFlow("Current Flow", localContents);
final ComparableDataFlow proposedFlow = new StandardComparableDataFlow("New Flow", updatedSnapshot.getFlowContents());
final Set<String> ancestorServiceIds = group.getAncestorServiceIds();
final FlowComparator flowComparator = new StandardFlowComparator(localFlow, proposedFlow, ancestorServiceIds, new StaticDifferenceDescriptor(),
Function.identity(), VersionedComponent::getIdentifier, FlowComparatorVersionedStrategy.DEEP);
final FlowComparison comparison = flowComparator.compare();
final FlowManager flowManager = controllerFacade.getFlowManager();
final Set<AffectedComponentEntity> affectedComponents = comparison.getDifferences().stream()
.filter(difference -> difference.getDifferenceType() != DifferenceType.COMPONENT_ADDED) // components that are added are not components that will be affected in the local flow.
.filter(FlowDifferenceFilters.FILTER_ADDED_REMOVED_REMOTE_PORTS)
.filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
.filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, proposedFlow.getContents(), flowManager))
.filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
.filter(diff -> !FlowDifferenceFilters.isLocalScheduleStateChange(diff))
.filter(diff -> !FlowDifferenceFilters.isPropertyMissingFromGhostComponent(diff, flowManager))
.filter(difference -> difference.getDifferenceType() != DifferenceType.POSITION_CHANGED)
.map(difference -> {
final VersionedComponent localComponent = difference.getComponentA();
final String state;
final ProcessGroup localGroup;
switch (localComponent.getComponentType()) {
case CONTROLLER_SERVICE:
final String serviceId = localComponent.getInstanceIdentifier();
final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
localGroup = serviceNode.getProcessGroup();
state = serviceNode.getState().name();
break;
case PROCESSOR:
final String processorId = localComponent.getInstanceIdentifier();
final ProcessorNode procNode = processorDAO.getProcessor(processorId);
localGroup = procNode.getProcessGroup();
state = procNode.getPhysicalScheduledState().name();
break;
case REMOTE_INPUT_PORT:
final InstantiatedVersionedRemoteGroupPort remoteInputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
final RemoteProcessGroup inputPortRpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteInputPort.getInstanceGroupId());
localGroup = inputPortRpg.getProcessGroup();
state = inputPortRpg.getInputPort(remoteInputPort.getInstanceIdentifier()).getScheduledState().name();
break;
case REMOTE_OUTPUT_PORT:
final InstantiatedVersionedRemoteGroupPort remoteOutputPort = (InstantiatedVersionedRemoteGroupPort) localComponent;
final RemoteProcessGroup outputPortRpg = remoteProcessGroupDAO.getRemoteProcessGroup(remoteOutputPort.getInstanceGroupId());
localGroup = outputPortRpg.getProcessGroup();
state = outputPortRpg.getOutputPort(remoteOutputPort.getInstanceIdentifier()).getScheduledState().name();
break;
case INPUT_PORT:
final InstantiatedVersionedPort versionedInputPort = (InstantiatedVersionedPort) localComponent;
final Port inputPort = getInputPort(versionedInputPort);
if (inputPort == null) {
localGroup = null;
state = null;
} else {
localGroup = inputPort.getProcessGroup();
state = inputPort.getScheduledState().name();
}
break;
case OUTPUT_PORT:
final InstantiatedVersionedPort versionedOutputPort = (InstantiatedVersionedPort) localComponent;
final Port outputPort = getOutputPort(versionedOutputPort);
if (outputPort == null) {
localGroup = null;
state = null;
} else {
localGroup = outputPort.getProcessGroup();
state = outputPort.getScheduledState().name();
}
break;
default:
state = null;
localGroup = null;
break;
}
if (localGroup != null && localGroup.resolveExecutionEngine() == ExecutionEngine.STATELESS) {
return createStatelessGroupAffectedComponentEntity(localGroup);
}
return createAffectedComponentEntity((InstantiatedVersionedComponent) localComponent, localComponent.getComponentType().name(), state);
})
.collect(Collectors.toCollection(HashSet::new));
for (final FlowDifference difference : comparison.getDifferences()) {
// Ignore differences for adding remote ports
if (FlowDifferenceFilters.isAddedOrRemovedRemotePort(difference)) {
continue;
}
// Ignore name changes to public ports
if (FlowDifferenceFilters.isPublicPortNameChange(difference)) {
continue;
}
if (FlowDifferenceFilters.isNewPropertyWithDefaultValue(difference, controllerFacade.getFlowManager())) {
continue;
}
if (FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(difference, updatedSnapshot.getFlowContents(), controllerFacade.getFlowManager())) {
continue;
}
final VersionedComponent localComponent = difference.getComponentA();
if (localComponent == null) {
continue;
}
// If any Process Group is removed, consider all components below that Process Group as an affected component
if (difference.getDifferenceType() == DifferenceType.COMPONENT_REMOVED && localComponent.getComponentType() == org.apache.nifi.flow.ComponentType.PROCESS_GROUP) {
final String localGroupId = ((InstantiatedVersionedProcessGroup) localComponent).getInstanceIdentifier();
final ProcessGroup localGroup = processGroupDAO.getProcessGroup(localGroupId);
localGroup.findAllProcessors().stream()
.map(comp -> createAffectedComponentEntity(comp))
.forEach(affectedComponents::add);
localGroup.findAllFunnels().stream()
.map(comp -> createAffectedComponentEntity(comp))
.forEach(affectedComponents::add);
localGroup.findAllInputPorts().stream()
.map(comp -> createAffectedComponentEntity(comp))
.forEach(affectedComponents::add);
localGroup.findAllOutputPorts().stream()
.map(comp -> createAffectedComponentEntity(comp))
.forEach(affectedComponents::add);
localGroup.findAllRemoteProcessGroups().stream()
.flatMap(rpg -> Stream.concat(rpg.getInputPorts().stream(), rpg.getOutputPorts().stream()))
.map(comp -> createAffectedComponentEntity(comp))
.forEach(affectedComponents::add);
localGroup.findAllControllerServices().stream()
.map(comp -> createAffectedComponentEntity(comp))
.forEach(affectedComponents::add);
}
if (localComponent.getComponentType() == org.apache.nifi.flow.ComponentType.CONTROLLER_SERVICE) {
final String serviceId = localComponent.getInstanceIdentifier();
final ControllerServiceNode serviceNode = controllerServiceDAO.getControllerService(serviceId);
final List<ControllerServiceNode> referencingServices = serviceNode.getReferences().findRecursiveReferences(ControllerServiceNode.class);
for (final ControllerServiceNode referencingService : referencingServices) {
affectedComponents.add(createAffectedComponentEntity(referencingService));
}
final List<ProcessorNode> referencingProcessors = serviceNode.getReferences().findRecursiveReferences(ProcessorNode.class);
for (final ProcessorNode referencingProcessor : referencingProcessors) {
affectedComponents.add(createAffectedComponentEntity(referencingProcessor));
}
}
}
// Create a map of all connectable components by versioned component ID to the connectable component itself
final Map<String, List<Connectable>> connectablesByVersionId = new HashMap<>();
mapToConnectableId(group.findAllFunnels(), connectablesByVersionId);
mapToConnectableId(group.findAllInputPorts(), connectablesByVersionId);
mapToConnectableId(group.findAllOutputPorts(), connectablesByVersionId);
mapToConnectableId(group.findAllProcessors(), connectablesByVersionId);
final List<RemoteGroupPort> remotePorts = new ArrayList<>();
for (final RemoteProcessGroup rpg : group.findAllRemoteProcessGroups()) {
remotePorts.addAll(rpg.getInputPorts());
remotePorts.addAll(rpg.getOutputPorts());
}
mapToConnectableId(remotePorts, connectablesByVersionId);
// If any connection is added or modified, we need to stop both the source (if it exists in the flow currently)
// and the destination (if it exists in the flow currently).
for (final FlowDifference difference : comparison.getDifferences()) {
VersionedComponent component = difference.getComponentA();
if (component == null) {
component = difference.getComponentB();
}
if (component.getComponentType() != org.apache.nifi.flow.ComponentType.CONNECTION) {
continue;
}
final VersionedConnection connection = (VersionedConnection) component;
final String sourceVersionedId = connection.getSource().getId();
final List<Connectable> sources = connectablesByVersionId.get(sourceVersionedId);
if (sources != null) {
for (final Connectable source : sources) {
affectedComponents.add(createAffectedComponentEntity(source));
}
}
final String destinationVersionId = connection.getDestination().getId();
final List<Connectable> destinations = connectablesByVersionId.get(destinationVersionId);
if (destinations != null) {
for (final Connectable destination : destinations) {
affectedComponents.add(createAffectedComponentEntity(destination));
}
}
}
return affectedComponents;
}