public synchronized void innerReceiveJob()

in taverna-workflowmodel-api/src/main/java/org/apache/taverna/workflowmodel/processor/iteration/CrossProduct.java [54:104]


	public synchronized void innerReceiveJob(int inputIndex, Job newJob) {
		if (getChildCount() == 1) {
			/*
			 * there's only one input and there's nothing to do here so push the
			 * job through
			 */
			pushJob(newJob);
			return;
		}
		if (!ownerToCache.containsKey(newJob.getOwningProcess())) {
			List<Set<Job>> perInputCache = new ArrayList<>();
			for (int i = 0; i < getChildCount(); i++)
				perInputCache.add(new HashSet<Job>());
			ownerToCache.put(newJob.getOwningProcess(), perInputCache);
		}
		// Store the new job
		List<Set<Job>> perInputCache = ownerToCache.get(newJob
				.getOwningProcess());
		perInputCache.get(inputIndex).add(newJob);
		/*
		 * Find all combinations of the new job with all permutations of jobs in
		 * the other caches. We could make this a lot easier by restricting it
		 * to a single pair of inputs, this might be a more sane way to go in
		 * the future...
		 */
		Set<Job> workingSet = perInputCache.get(0);
		if (inputIndex == 0) {
			workingSet = new HashSet<>();
			workingSet.add(newJob);
		}
		for (int i = 1; i < getChildCount(); i++) {
			Set<Job> thisSet = perInputCache.get(i);
			if (i == inputIndex) {
				/*
				 * This is the cache for the new job, so we rewrite the set to a
				 * single element one containing only the newly submitted job
				 */
				thisSet = new HashSet<>();
				thisSet.add(newJob);
			}
			workingSet = merge(workingSet, thisSet);
		}
		for (Job outputJob : workingSet)
			pushJob(outputJob);
		if (canClearCache(inputIndex, newJob.getOwningProcess()))
			/*
			 * If we've seen completions for all the other indexes we don't need
			 * to cache jobs for this index
			 */
			perInputCache.get(inputIndex).clear();
	}