in taverna-workflowmodel-api/src/main/java/org/apache/taverna/workflowmodel/processor/iteration/CrossProduct.java [54:104]
public synchronized void innerReceiveJob(int inputIndex, Job newJob) {
if (getChildCount() == 1) {
/*
* there's only one input and there's nothing to do here so push the
* job through
*/
pushJob(newJob);
return;
}
if (!ownerToCache.containsKey(newJob.getOwningProcess())) {
List<Set<Job>> perInputCache = new ArrayList<>();
for (int i = 0; i < getChildCount(); i++)
perInputCache.add(new HashSet<Job>());
ownerToCache.put(newJob.getOwningProcess(), perInputCache);
}
// Store the new job
List<Set<Job>> perInputCache = ownerToCache.get(newJob
.getOwningProcess());
perInputCache.get(inputIndex).add(newJob);
/*
* Find all combinations of the new job with all permutations of jobs in
* the other caches. We could make this a lot easier by restricting it
* to a single pair of inputs, this might be a more sane way to go in
* the future...
*/
Set<Job> workingSet = perInputCache.get(0);
if (inputIndex == 0) {
workingSet = new HashSet<>();
workingSet.add(newJob);
}
for (int i = 1; i < getChildCount(); i++) {
Set<Job> thisSet = perInputCache.get(i);
if (i == inputIndex) {
/*
* This is the cache for the new job, so we rewrite the set to a
* single element one containing only the newly submitted job
*/
thisSet = new HashSet<>();
thisSet.add(newJob);
}
workingSet = merge(workingSet, thisSet);
}
for (Job outputJob : workingSet)
pushJob(outputJob);
if (canClearCache(inputIndex, newJob.getOwningProcess()))
/*
* If we've seen completions for all the other indexes we don't need
* to cache jobs for this index
*/
perInputCache.get(inputIndex).clear();
}