in taverna-scufl2-api/src/main/java/org/apache/taverna/scufl2/api/common/Scufl2Tools.java [415:480]
public ProcessorSplit splitProcessors(Collection<Processor> processors,
Processor splitPoint) {
Set<Processor> upStream = new HashSet<>();
Set<Processor> downStream = new HashSet<>();
Set<Processor> queue = new HashSet<>();
queue.add(splitPoint);
// First let's go upstream
while (!queue.isEmpty()) {
Processor processor = queue.iterator().next();
queue.remove(processor);
List<BlockingControlLink> preConditions = controlLinksBlocking(processor);
for (BlockingControlLink condition : preConditions) {
Processor upstreamProc = condition.getUntilFinished();
if (!upStream.contains(upstreamProc)) {
upStream.add(upstreamProc);
queue.add(upstreamProc);
}
}
for (InputProcessorPort inputPort : processor.getInputPorts())
for (DataLink incomingLink : datalinksTo(inputPort)) {
SenderPort source = incomingLink.getReceivesFrom();
if (!(source instanceof OutputProcessorPort))
continue;
Processor upstreamProc = ((OutputProcessorPort) source)
.getParent();
if (!upStream.contains(upstreamProc)) {
upStream.add(upstreamProc);
queue.add(upstreamProc);
}
}
}
// Our split
queue.add(splitPoint);
// Then downstream
while (!queue.isEmpty()) {
Processor processor = queue.iterator().next();
queue.remove(processor);
List<BlockingControlLink> controlledConditions = controlLinksWaitingFor(processor);
for (BlockingControlLink condition : controlledConditions) {
Processor downstreamProc = condition.getBlock();
if (!downStream.contains(downstreamProc)) {
downStream.add(downstreamProc);
queue.add(downstreamProc);
}
}
for (OutputProcessorPort outputPort : processor.getOutputPorts())
for (DataLink datalink : datalinksFrom(outputPort)) {
ReceiverPort sink = datalink.getSendsTo();
if (!(sink instanceof InputProcessorPort))
continue;
Processor downstreamProcc = ((InputProcessorPort) sink)
.getParent();
if (!downStream.contains(downstreamProcc)) {
downStream.add(downstreamProcc);
queue.add(downstreamProcc);
}
}
}
Set<Processor> undecided = new HashSet<>(processors);
undecided.remove(splitPoint);
undecided.removeAll(upStream);
undecided.removeAll(downStream);
return new ProcessorSplit(splitPoint, upStream, downStream, undecided);
}