private void addDataLinks()

in taverna-execution-local/src/main/java/org/apache/taverna/platform/execution/impl/local/WorkflowToDataflowMapper.java [434:484]


	private void addDataLinks(Workflow workflow, Dataflow dataflow)
			throws EditException {
		for (DataLink dataLink : workflow.getDataLinks()) {
			ReceiverPort receiverPort = dataLink.getSendsTo();
			SenderPort senderPort = dataLink.getReceivesFrom();
			EventForwardingOutputPort source = outputPorts.get(senderPort);
			EventHandlingInputPort sink = inputPorts.get(receiverPort);
			Integer mergePosition = dataLink.getMergePosition();
			if (mergePosition != null) {
				if (!merges.containsKey(receiverPort)) {
					Merge merge = edits.createMerge(dataflow);
					edits.getAddMergeEdit(dataflow, merge).doEdit();
					merges.put(receiverPort, merge);
				}
				Merge merge = merges.get(receiverPort);
				// create merge input port
				MergeInputPort mergeInputPort = edits.createMergeInputPort(
						merge, "input" + mergePosition, sink.getDepth());
				// add it to the correct position in the merge
				@SuppressWarnings("unchecked")
				List<MergeInputPort> mergeInputPorts = (List<MergeInputPort>) merge
						.getInputPorts();
				if (mergePosition > mergeInputPorts.size())
					mergeInputPorts.add(mergeInputPort);
				else
					mergeInputPorts.add(mergePosition, mergeInputPort);
				// connect a datalink into the merge
				Datalink datalinkIn = edits.createDatalink(source,
						mergeInputPort);
				edits.getConnectDatalinkEdit(datalinkIn).doEdit();
				// check if the merge output has been connected
				EventForwardingOutputPort mergeOutputPort = merge
						.getOutputPort();
				if (mergeOutputPort.getOutgoingLinks().isEmpty()) {
					Datalink datalinkOut = edits.createDatalink(
							merge.getOutputPort(), sink);
					edits.getConnectDatalinkEdit(datalinkOut).doEdit();
				} else if (mergeOutputPort.getOutgoingLinks().size() == 1) {
					if (mergeOutputPort.getOutgoingLinks().iterator().next()
							.getSink() != sink)
						throw new EditException(
								"Cannot add a different sinkPort to a Merge that already has one defined");
				} else
					throw new EditException(
							"The merge instance cannot have more that 1 outgoing Datalink");
			} else {
				Datalink datalink = edits.createDatalink(source, sink);
				edits.getConnectDatalinkEdit(datalink).doEdit();
			}
		}
	}