public void innerReceiveJob()

in taverna-workflowmodel-api/src/main/java/org/apache/taverna/workflowmodel/processor/iteration/PrefixDotProduct.java [45:109]


	public void innerReceiveJob(int inputIndex, Job newJob) {
		String owningProcess = newJob.getOwningProcess();
		TreeCache[] caches;
		synchronized (ownerToCache) {
			caches = ownerToCache.get(owningProcess);
			// Create the caches if not already initialized
			if (caches == null) {
				caches = new TreeCache[getChildCount()];
				for (int i = 0; i < getChildCount(); i++)
					caches[i] = new TreeCache();
				ownerToCache.put(owningProcess, caches);
			}
		}

		// Store the job
		caches[inputIndex].insertJob(newJob);

		/*
		 * If this job came in on index 0 we have to find all jobs in the cache
		 * for index 1 which have the index array as a prefix. Fortunately this
		 * is quite easy due to the tree structure of the cache, we can just ask
		 * for all nodes in the cache with that index.
		 */
		if (inputIndex == 0) {
			int[] prefixIndexArray = newJob.getIndex();
			List<Job> matchingJobs;
			synchronized (caches[1]) {
				// Match all jobs and remove them so other calls can't produce
				// duplicates
				matchingJobs = caches[1].jobsWithPrefix(prefixIndexArray);
				caches[1].cut(prefixIndexArray);
			}
			for (Job job : matchingJobs) {
				Map<String, T2Reference> newDataMap = new HashMap<>();
				newDataMap.putAll(newJob.getData());
				newDataMap.putAll(job.getData());
				Job mergedJob = new Job(owningProcess, job.getIndex(),
						newDataMap, newJob.getContext());
				pushJob(mergedJob);
			}
		}

		/*
		 * If the job came in on index 1 we have to find the job on index 0 that
		 * matches the first 'n' indices, where 'n' is determined by the depth
		 * of jobs on the cache for index 0.
		 */
		else if (inputIndex == 1) {
			// Only act if we've received jobs on the cache at index 0
			if (caches[0].getIndexLength() > 0) {
				int[] prefix = new int[caches[0].getIndexLength()];
				for (int i = 0; i < prefix.length; i++)
					prefix[i] = newJob.getIndex()[i];
				Job j = caches[0].get(prefix);
				if (j != null) {
					Map<String, T2Reference> newDataMap = new HashMap<>();
					newDataMap.putAll(j.getData());
					newDataMap.putAll(newJob.getData());
					Job mergedJob = new Job(owningProcess, newJob.getIndex(),
							newDataMap, newJob.getContext());
					pushJob(mergedJob);
				}
			}
		}
	}