in taverna-workflowmodel-impl/src/main/java/org/apache/taverna/workflowmodel/impl/MergeImpl.java [121:173]
protected void receiveEvent(WorkflowDataToken token, String portName) {
List<T2Reference> outputList;
String owningProcess = token.getOwningProcess();
synchronized (partialOutputsByProcess) {
outputList = partialOutputsByProcess.get(owningProcess);
if (outputList == null) {
int numPorts = getInputPorts().size();
outputList = new ArrayList<>(nCopies(numPorts, (T2Reference) null));
partialOutputsByProcess.put(owningProcess, outputList);
}
}
int portIndex = inputPortNameToIndex(portName);
if (portIndex == -1)
throw new WorkflowStructureException(
"Received event on unknown port " + portName);
int[] currentIndex = token.getIndex();
int[] newIndex = new int[currentIndex.length + 1];
newIndex[0] = portIndex;
arraycopy(currentIndex, 0, newIndex, 1, currentIndex.length);
InvocationContext context = token.getContext();
output.sendEvent(new WorkflowDataToken(owningProcess,
newIndex, token.getData(), context));
if (token.getIndex().length == 0)
// Add to completion list
synchronized (outputList) {
if (outputList.size() <= portIndex)
// Ports changed after initiating running as our list is
// smaller than portIndex
throw new WorkflowStructureException(
"Unexpected addition of output port " + portName
+ " at " + portIndex);
if (outputList.get(portIndex) != null)
throw new WorkflowStructureException(
"Already received completion for port " + portName
+ " " + outputList.get(portIndex));
outputList.set(portIndex, token.getData());
if (!outputList.contains(null)) {
// We're finished, let's register and send out the list
ListService listService = context.getReferenceService()
.getListService();
IdentifiedList<T2Reference> registeredList = listService
.registerList(outputList, context);
WorkflowDataToken workflowDataToken = new WorkflowDataToken(
owningProcess, new int[0], registeredList.getId(),
context);
synchronized (partialOutputsByProcess) {
partialOutputsByProcess.remove(owningProcess);
}
output.sendEvent(workflowDataToken);
}
}
}