protected void receiveEvent()

in taverna-workflowmodel-impl/src/main/java/org/apache/taverna/workflowmodel/impl/MergeImpl.java [121:173]


	protected void receiveEvent(WorkflowDataToken token, String portName) {
		List<T2Reference> outputList;
		String owningProcess = token.getOwningProcess();
		synchronized (partialOutputsByProcess) {
			outputList = partialOutputsByProcess.get(owningProcess);
			if (outputList == null) {
				int numPorts = getInputPorts().size();
				outputList = new ArrayList<>(nCopies(numPorts, (T2Reference) null));
				partialOutputsByProcess.put(owningProcess, outputList);
			}
		}
		int portIndex = inputPortNameToIndex(portName);
		if (portIndex == -1)
			throw new WorkflowStructureException(
					"Received event on unknown port " + portName);
		int[] currentIndex = token.getIndex();
		int[] newIndex = new int[currentIndex.length + 1];
		newIndex[0] = portIndex;
		arraycopy(currentIndex, 0, newIndex, 1, currentIndex.length);
		InvocationContext context = token.getContext();
		output.sendEvent(new WorkflowDataToken(owningProcess,
				newIndex, token.getData(), context));
		if (token.getIndex().length == 0)
			// Add to completion list
			synchronized (outputList) {
				if (outputList.size() <= portIndex)
					// Ports changed after initiating running as our list is
					// smaller than portIndex
					throw new WorkflowStructureException(
							"Unexpected addition of output port " + portName
									+ " at " + portIndex);
				if (outputList.get(portIndex) != null)
					throw new WorkflowStructureException(
							"Already received completion for port " + portName
									+ " " + outputList.get(portIndex));

				outputList.set(portIndex, token.getData());
				if (!outputList.contains(null)) {
					// We're finished, let's register and send out the list
					ListService listService = context.getReferenceService()
							.getListService();
					IdentifiedList<T2Reference> registeredList = listService
							.registerList(outputList, context);
					WorkflowDataToken workflowDataToken = new WorkflowDataToken(
							owningProcess, new int[0], registeredList.getId(),
							context);
					synchronized (partialOutputsByProcess) {
						partialOutputsByProcess.remove(owningProcess);
					}
					output.sendEvent(workflowDataToken);
				}
			}
	}