in nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProcessGroupResource.java [478:645]
public Response updateProcessGroup(
@Parameter(
description = "The process group id.",
required = true
)
@PathParam("id") final String id,
@Parameter(
description = "The process group configuration details.",
required = true
) final ProcessGroupEntity requestProcessGroupEntity) {
if (requestProcessGroupEntity == null || requestProcessGroupEntity.getComponent() == null) {
throw new IllegalArgumentException("Process group details must be specified.");
}
if (requestProcessGroupEntity.getRevision() == null) {
throw new IllegalArgumentException("Revision must be specified.");
}
// ensure the same id is being used
final ProcessGroupDTO requestProcessGroupDTO = requestProcessGroupEntity.getComponent();
if (!id.equals(requestProcessGroupDTO.getId())) {
throw new IllegalArgumentException(String.format("The process group id (%s) in the request body does "
+ "not equal the process group id of the requested resource (%s).", requestProcessGroupDTO.getId(), id));
}
final PositionDTO proposedPosition = requestProcessGroupDTO.getPosition();
if (proposedPosition != null) {
if (proposedPosition.getX() == null || proposedPosition.getY() == null) {
throw new IllegalArgumentException("The x and y coordinate of the proposed position must be specified.");
}
}
final String processGroupUpdateStrategy = requestProcessGroupEntity.getProcessGroupUpdateStrategy();
final ProcessGroupRecursivity updateStrategy;
if (processGroupUpdateStrategy == null) {
updateStrategy = ProcessGroupRecursivity.DIRECT_CHILDREN;
} else {
updateStrategy = ProcessGroupRecursivity.valueOf(processGroupUpdateStrategy);
}
final String executionEngine = requestProcessGroupDTO.getExecutionEngine();
if (executionEngine != null) {
try {
ExecutionEngine.valueOf(executionEngine);
} catch (final IllegalArgumentException iae) {
throw new IllegalArgumentException("Illegal value proposed for Execution Engine: " + executionEngine);
}
}
final String statelessTimeout = requestProcessGroupDTO.getStatelessFlowTimeout();
if (statelessTimeout != null) {
try {
FormatUtils.getPreciseTimeDuration(statelessTimeout, TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new IllegalArgumentException("Illegal value proposed for Stateless Flow Timeout: " + statelessTimeout);
}
}
final Integer maxConcurrentTasks = requestProcessGroupDTO.getMaxConcurrentTasks();
if (maxConcurrentTasks != null && maxConcurrentTasks < 1) {
throw new IllegalArgumentException("Illegal value proposed for Max Concurrent Tasks: " + maxConcurrentTasks);
}
if (isReplicateRequest()) {
return replicate(HttpMethod.PUT, requestProcessGroupEntity);
} else if (isDisconnectedFromCluster()) {
verifyDisconnectedNodeModification(requestProcessGroupEntity.isDisconnectedNodeAcknowledged());
}
// handle expects request (usually from the cluster manager)
final ParameterContextReferenceEntity requestParamContext = requestProcessGroupDTO.getParameterContext();
final String requestGroupId = requestProcessGroupDTO.getId();
final Map<ProcessGroupEntity, Revision> updatableProcessGroups = new HashMap<>();
updatableProcessGroups.put(requestProcessGroupEntity, getRevision(requestProcessGroupEntity, requestGroupId));
if (updateStrategy == ProcessGroupRecursivity.ALL_DESCENDANTS) {
for (ProcessGroupEntity processGroupEntity : serviceFacade.getProcessGroups(requestGroupId, updateStrategy)) {
final ProcessGroupDTO processGroupDTO = processGroupEntity.getComponent();
final String processGroupId = processGroupDTO == null ? processGroupEntity.getId() : processGroupDTO.getId();
if (processGroupDTO != null) {
processGroupDTO.setParameterContext(requestParamContext);
}
updatableProcessGroups.put(processGroupEntity, getRevision(processGroupEntity, processGroupId));
}
}
return withWriteLock(
serviceFacade,
requestProcessGroupEntity,
new HashSet<>(updatableProcessGroups.values()),
lookup -> {
final NiFiUser user = NiFiUserUtils.getNiFiUser();
for (final ProcessGroupEntity updatableGroupEntity : updatableProcessGroups.keySet()) {
final ProcessGroupDTO updatableGroupDto = updatableGroupEntity.getComponent();
final String groupId = updatableGroupDto == null ? updatableGroupEntity.getId() : updatableGroupDto.getId();
Authorizable authorizable = lookup.getProcessGroup(groupId).getAuthorizable();
authorizable.authorize(authorizer, RequestAction.WRITE, user);
// Ensure that user has READ permission on current Parameter Context (if any) because user is un-binding.
final ParameterContextReferenceEntity referencedParamContext = updatableGroupDto.getParameterContext();
if (referencedParamContext != null) {
// Lookup the current Parameter Context and determine whether or not the Parameter Context is changing
final ProcessGroupEntity currentGroupEntity = serviceFacade.getProcessGroup(groupId);
final ProcessGroupDTO groupDto = currentGroupEntity.getComponent();
final ParameterContextReferenceEntity currentParamContext = groupDto.getParameterContext();
final String currentParamContextId = currentParamContext == null ? null : currentParamContext.getId();
final boolean parameterContextChanging = !Objects.equals(referencedParamContext.getId(), currentParamContextId);
// If Parameter Context is changing...
if (parameterContextChanging) {
// In order to bind to a Parameter Context, the user must have the READ policy to that Parameter Context.
if (referencedParamContext.getId() != null) {
lookup.getParameterContext(referencedParamContext.getId()).authorize(authorizer, RequestAction.READ, user);
}
// If currently referencing a Parameter Context, we must authorize that the user has READ permissions on the Parameter Context in order to un-bind to it.
if (currentParamContextId != null) {
lookup.getParameterContext(currentParamContextId).authorize(authorizer, RequestAction.READ, user);
}
// Because the user will be changing the behavior of any component in this group that is currently referencing any Parameter, we must ensure that the user has
// both READ and WRITE policies for each of those components.
for (final AffectedComponentEntity affectedComponentEntity : serviceFacade.getProcessorsReferencingParameter(groupId)) {
final Authorizable processorAuthorizable = lookup.getProcessor(affectedComponentEntity.getId()).getAuthorizable();
processorAuthorizable.authorize(authorizer, RequestAction.READ, user);
processorAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
}
for (final AffectedComponentEntity affectedComponentEntity : serviceFacade.getControllerServicesReferencingParameter(groupId)) {
final Authorizable serviceAuthorizable = lookup.getControllerService(affectedComponentEntity.getId()).getAuthorizable();
serviceAuthorizable.authorize(authorizer, RequestAction.READ, user);
serviceAuthorizable.authorize(authorizer, RequestAction.WRITE, user);
}
}
}
}
},
() -> {
for (final ProcessGroupEntity entity : updatableProcessGroups.keySet()) {
serviceFacade.verifyUpdateProcessGroup(entity.getComponent());
}
},
(revisions, entities) -> {
ProcessGroupEntity responseEntity = null;
for (Map.Entry<ProcessGroupEntity, Revision> entry : updatableProcessGroups.entrySet()) {
// update the process group
final Revision revision = entry.getValue();
final ProcessGroupDTO groupDTO = entry.getKey().getComponent();
final ProcessGroupEntity entity = serviceFacade.updateProcessGroup(revision, groupDTO);
if (requestGroupId.equals(entity.getId())) {
responseEntity = entity;
populateRemainingProcessGroupEntityContent(responseEntity);
// prune response as necessary
if (responseEntity.getComponent() != null) {
responseEntity.getComponent().setContents(null);
}
}
}
return generateOkResponse(responseEntity).build();
}
);
}