in nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowUpdateResource.java [333:563]
private void updateFlow(final String groupId, final ComponentLifecycle componentLifecycle, final URI requestUri,
final Set<AffectedComponentEntity> affectedComponents, final boolean replicateRequest,
final String replicateUriPath, final Revision revision, final T requestEntity,
final RegisteredFlowSnapshot flowSnapshot, final AsynchronousWebRequest<T, T> asyncRequest,
final String idGenerationSeed, final boolean allowDirtyFlowUpdate)
throws LifecycleManagementException, ResumeFlowException {
// Steps 5-6: Determine which components must be stopped and stop them.
final Set<String> stoppableReferenceTypes = new HashSet<>();
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
stoppableReferenceTypes.add(AffectedComponentDTO.COMPONENT_TYPE_STATELESS_GROUP);
final Set<AffectedComponentEntity> runningComponents = affectedComponents.stream()
.filter(entity -> stoppableReferenceTypes.contains(entity.getComponent().getReferenceType()))
.filter(entity -> isActive(entity.getComponent()))
.collect(Collectors.toSet());
logger.info("Stopping {} Processors", runningComponents.size());
final CancellableTimedPause stopComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(stopComponentsPause::cancel);
componentLifecycle.scheduleComponents(requestUri, groupId, runningComponents, ScheduledState.STOPPED, stopComponentsPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return;
}
asyncRequest.markStepComplete();
// Steps 7-8. Disable enabled controller services that are affected.
// We don't want to disable services that are already disabling. But we need to wait for their state to transition from Disabling to Disabled.
final Set<AffectedComponentEntity> servicesToWaitFor = affectedComponents.stream()
.filter(dto -> AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(dto.getComponent().getReferenceType()))
.filter(dto -> {
final String state = dto.getComponent().getState();
return "Enabled".equalsIgnoreCase(state) || "Enabling".equalsIgnoreCase(state) || "Disabling".equalsIgnoreCase(state);
})
.collect(Collectors.toSet());
final Set<AffectedComponentEntity> enabledServices = servicesToWaitFor.stream()
.filter(dto -> {
final String state = dto.getComponent().getState();
return "Enabling".equalsIgnoreCase(state) || "Enabled".equalsIgnoreCase(state);
})
.collect(Collectors.toSet());
logger.info("Disabling {} Controller Services", enabledServices.size());
final CancellableTimedPause disableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(disableServicesPause::cancel);
componentLifecycle.activateControllerServices(requestUri, groupId, enabledServices, servicesToWaitFor, ControllerServiceState.DISABLED, disableServicesPause, InvalidComponentAction.SKIP);
if (asyncRequest.isCancelled()) {
return;
}
asyncRequest.markStepComplete();
// Get the Original Flow Snapshot in case we fail to update and need to rollback
// This only applies to flows that were under version control, update may be called without version control
final VersionControlInformationEntity vciEntity = serviceFacade.getVersionControlInformation(groupId);
final RegisteredFlowSnapshot originalFlowSnapshot;
if (vciEntity == null) {
originalFlowSnapshot = null;
} else {
final FlowSnapshotContainer originalFlowSnapshotContainer = serviceFacade.getVersionedFlowSnapshot(vciEntity.getVersionControlInformation(), true);
originalFlowSnapshot = originalFlowSnapshotContainer.getFlowSnapshot();
}
try {
if (replicateRequest) {
// If replicating request, steps 9-11 are performed on each node individually
final URI replicateUri = buildUri(requestUri, replicateUriPath, null);
final NiFiUser user = NiFiUserUtils.getNiFiUser();
try {
final NodeResponse clusterResponse = replicateFlowUpdateRequest(replicateUri, user, requestEntity, revision, flowSnapshot);
verifyResponseCode(clusterResponse, replicateUri, user, "update");
} catch (final Exception e) {
if (originalFlowSnapshot == null) {
logger.debug("Failed to update flow but could not determine original flow to rollback to so will not make any attempt to revert the flow.");
} else {
try {
final NodeResponse rollbackResponse = replicateFlowUpdateRequest(replicateUri, user, requestEntity, revision, originalFlowSnapshot);
verifyResponseCode(rollbackResponse, replicateUri, user, "rollback");
} catch (final Exception inner) {
e.addSuppressed(inner);
}
}
throw e;
}
} else {
// Step 9: Ensure that if any connection exists in the flow and does not exist in the proposed snapshot,
// that it has no data in it. Ensure that no Input Port was removed, unless it currently has no incoming connections.
// Ensure that no Output Port was removed, unless it currently has no outgoing connections.
serviceFacade.verifyCanUpdate(groupId, flowSnapshot, true, !allowDirtyFlowUpdate);
// Get an updated Revision for the Process Group. If the group is stateless and was stopped for the update, its Revision may have changed.
// As a result, we need to ensure that we have the most up-to-date revision for the group.
final RevisionDTO currentGroupRevisionDto = serviceFacade.getProcessGroup(groupId).getRevision();
final Revision currentGroupRevision = new Revision(currentGroupRevisionDto.getVersion(), currentGroupRevisionDto.getClientId(), groupId);
// Step 10-11. Update Process Group to the new flow.
// Each concrete class defines its own update flow functionality
try {
performUpdateFlow(groupId, currentGroupRevision, requestEntity, flowSnapshot, idGenerationSeed, !allowDirtyFlowUpdate, true);
} catch (final Exception e) {
// If clustered, just throw the original Exception.
// Otherwise, rollback the flow update. We do not perform the rollback if clustered because
// we want this to be handled at a higher level, allowing the request to replace our flow version to come from the coordinator
// if any node fails to perform the update.
if (isClustered()) {
throw e;
}
// Rollback the update to the original flow snapshot. If there's any Exception, add it as a Suppressed Exception to the original so
// that it can be logged but not overtake the original Exception as the cause.
logger.error("Failed to update Process Group {}; will attempt to rollback any changes", groupId, e);
try {
performUpdateFlow(groupId, currentGroupRevision, requestEntity, originalFlowSnapshot, idGenerationSeed, false, true);
} catch (final Exception inner) {
e.addSuppressed(inner);
}
throw e;
}
}
} finally {
if (!asyncRequest.isCancelled()) {
if (logger.isDebugEnabled()) {
logger.debug("Re-Enabling {} Controller Services: {}", enabledServices.size(), enabledServices);
}
asyncRequest.markStepComplete();
// Step 12. Re-enable all disabled controller services
final CancellableTimedPause enableServicesPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(enableServicesPause::cancel);
final Set<AffectedComponentEntity> servicesToEnable = getUpdatedEntities(enabledServices);
logger.info("Successfully updated flow; re-enabling {} Controller Services", servicesToEnable.size());
try {
componentLifecycle.activateControllerServices(requestUri, groupId, servicesToEnable, servicesToEnable,
ControllerServiceState.ENABLED, enableServicesPause, InvalidComponentAction.SKIP);
} catch (final IllegalStateException ise) {
// Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
throw new ResumeFlowException("Successfully updated flow but could not re-enable all Controller Services because " + ise.getMessage(), ise);
}
}
if (!asyncRequest.isCancelled()) {
if (logger.isDebugEnabled()) {
logger.debug("Restarting {} Processors: {}", runningComponents.size(), runningComponents);
}
asyncRequest.markStepComplete();
// Step 13. Restart all components
final Set<AffectedComponentEntity> componentsToStart = getUpdatedEntities(runningComponents);
// If there are any Remote Group Ports that are supposed to be started and have no connections, we want to remove those from our Set.
// This will happen if the Remote Group Port is transmitting when the flow change happens but the new flow does not have a connection
// to the port. In such a case, the Port still is included in the Updated Entities because we do not remove them when updating the flow
// (they are removed in the background).
final Set<AffectedComponentEntity> avoidStarting = new HashSet<>();
for (final AffectedComponentEntity componentEntity : componentsToStart) {
final AffectedComponentDTO componentDto = componentEntity.getComponent();
final String referenceType = componentDto.getReferenceType();
boolean startComponent = true;
try {
switch (referenceType) {
case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT:
case AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT: {
startComponent = serviceFacade.isRemoteGroupPortConnected(componentDto.getProcessGroupId(), componentDto.getId());
break;
}
case AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR: {
final ProcessorEntity entity = serviceFacade.getProcessor(componentEntity.getId());
if (entity == null || DISABLED_COMPONENT_STATE.equals(entity.getComponent().getState())) {
startComponent = false;
}
break;
}
case AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT: {
final PortEntity entity = serviceFacade.getInputPort(componentEntity.getId());
if (entity == null || DISABLED_COMPONENT_STATE.equals(entity.getComponent().getState())) {
startComponent = false;
}
break;
}
case AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT: {
final PortEntity entity = serviceFacade.getOutputPort(componentEntity.getId());
if (entity == null || DISABLED_COMPONENT_STATE.equals(entity.getComponent().getState())) {
startComponent = false;
}
break;
}
}
} catch (final ResourceNotFoundException rnfe) {
// Could occur if RPG is refreshed at just the right time.
startComponent = false;
}
// We must add the components to avoid starting to a separate Set and then remove them below,
// rather than removing the component here, because doing so would result in a ConcurrentModificationException.
if (!startComponent) {
avoidStarting.add(componentEntity);
}
}
componentsToStart.removeAll(avoidStarting);
final CancellableTimedPause startComponentsPause = new CancellableTimedPause(250, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
asyncRequest.setCancelCallback(startComponentsPause::cancel);
logger.info("Restarting {} Processors", componentsToStart.size());
try {
componentLifecycle.scheduleComponents(requestUri, groupId, componentsToStart, ScheduledState.RUNNING, startComponentsPause, InvalidComponentAction.SKIP);
} catch (final IllegalStateException ise) {
// Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
// a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
throw new ResumeFlowException("Successfully updated flow but could not restart all Processors because " + ise.getMessage(), ise);
}
}
}
asyncRequest.setCancelCallback(null);
}