in taverna-execution-local/src/main/java/org/apache/taverna/platform/execution/impl/local/WorkflowToDataflowMapper.java [434:484]
private void addDataLinks(Workflow workflow, Dataflow dataflow)
throws EditException {
for (DataLink dataLink : workflow.getDataLinks()) {
ReceiverPort receiverPort = dataLink.getSendsTo();
SenderPort senderPort = dataLink.getReceivesFrom();
EventForwardingOutputPort source = outputPorts.get(senderPort);
EventHandlingInputPort sink = inputPorts.get(receiverPort);
Integer mergePosition = dataLink.getMergePosition();
if (mergePosition != null) {
if (!merges.containsKey(receiverPort)) {
Merge merge = edits.createMerge(dataflow);
edits.getAddMergeEdit(dataflow, merge).doEdit();
merges.put(receiverPort, merge);
}
Merge merge = merges.get(receiverPort);
// create merge input port
MergeInputPort mergeInputPort = edits.createMergeInputPort(
merge, "input" + mergePosition, sink.getDepth());
// add it to the correct position in the merge
@SuppressWarnings("unchecked")
List<MergeInputPort> mergeInputPorts = (List<MergeInputPort>) merge
.getInputPorts();
if (mergePosition > mergeInputPorts.size())
mergeInputPorts.add(mergeInputPort);
else
mergeInputPorts.add(mergePosition, mergeInputPort);
// connect a datalink into the merge
Datalink datalinkIn = edits.createDatalink(source,
mergeInputPort);
edits.getConnectDatalinkEdit(datalinkIn).doEdit();
// check if the merge output has been connected
EventForwardingOutputPort mergeOutputPort = merge
.getOutputPort();
if (mergeOutputPort.getOutgoingLinks().isEmpty()) {
Datalink datalinkOut = edits.createDatalink(
merge.getOutputPort(), sink);
edits.getConnectDatalinkEdit(datalinkOut).doEdit();
} else if (mergeOutputPort.getOutgoingLinks().size() == 1) {
if (mergeOutputPort.getOutgoingLinks().iterator().next()
.getSink() != sink)
throw new EditException(
"Cannot add a different sinkPort to a Merge that already has one defined");
} else
throw new EditException(
"The merge instance cannot have more that 1 outgoing Datalink");
} else {
Datalink datalink = edits.createDatalink(source, sink);
edits.getConnectDatalinkEdit(datalink).doEdit();
}
}
}