public ProcessorSplit splitProcessors()

in taverna-scufl2-api/src/main/java/org/apache/taverna/scufl2/api/common/Scufl2Tools.java [415:480]


	public ProcessorSplit splitProcessors(Collection<Processor> processors,
			Processor splitPoint) {
		Set<Processor> upStream = new HashSet<>();
		Set<Processor> downStream = new HashSet<>();
		Set<Processor> queue = new HashSet<>();

		queue.add(splitPoint);

		// First let's go upstream
		while (!queue.isEmpty()) {
			Processor processor = queue.iterator().next();
			queue.remove(processor);
			List<BlockingControlLink> preConditions = controlLinksBlocking(processor);
			for (BlockingControlLink condition : preConditions) {
				Processor upstreamProc = condition.getUntilFinished();
				if (!upStream.contains(upstreamProc)) {
					upStream.add(upstreamProc);
					queue.add(upstreamProc);
				}
			}
			for (InputProcessorPort inputPort : processor.getInputPorts())
				for (DataLink incomingLink : datalinksTo(inputPort)) {
					SenderPort source = incomingLink.getReceivesFrom();
					if (!(source instanceof OutputProcessorPort))
						continue;
					Processor upstreamProc = ((OutputProcessorPort) source)
							.getParent();
					if (!upStream.contains(upstreamProc)) {
						upStream.add(upstreamProc);
						queue.add(upstreamProc);
					}
				}
		}
		// Our split
		queue.add(splitPoint);
		// Then downstream
		while (!queue.isEmpty()) {
			Processor processor = queue.iterator().next();
			queue.remove(processor);
			List<BlockingControlLink> controlledConditions = controlLinksWaitingFor(processor);
			for (BlockingControlLink condition : controlledConditions) {
				Processor downstreamProc = condition.getBlock();
				if (!downStream.contains(downstreamProc)) {
					downStream.add(downstreamProc);
					queue.add(downstreamProc);
				}
			}
			for (OutputProcessorPort outputPort : processor.getOutputPorts())
				for (DataLink datalink : datalinksFrom(outputPort)) {
					ReceiverPort sink = datalink.getSendsTo();
					if (!(sink instanceof InputProcessorPort))
						continue;
					Processor downstreamProcc = ((InputProcessorPort) sink)
							.getParent();
					if (!downStream.contains(downstreamProcc)) {
						downStream.add(downstreamProcc);
						queue.add(downstreamProcc);
					}
				}
		}
		Set<Processor> undecided = new HashSet<>(processors);
		undecided.remove(splitPoint);
		undecided.removeAll(upStream);
		undecided.removeAll(downStream);
		return new ProcessorSplit(splitPoint, upStream, downStream, undecided);
	}