protected void invoke()

in taverna-workflowmodel-extensions/src/main/java/org/apache/taverna/workflowmodel/processor/dispatch/layers/Invoke.java [128:206]


	protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
		// Register with the monitor
		final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
				getNextProcessID()).getOwningProcess();
		monMan.registerNode(activity, invocationProcessIdentifier,
				new HashSet<MonitorableProperty<?>>());
		monMan.registerNode(jobEvent, invocationProcessIdentifier,
				new HashSet<MonitorableProperty<?>>());

		/*
		 * The activity is an AsynchronousActivity so we invoke it with an
		 * AsynchronousActivityCallback object containing appropriate callback
		 * methods to push results, completions and failures back to the
		 * invocation layer.
		 * 
		 * Get the registered DataManager for this process. In most cases this
		 * will just be a single DataManager for the entire workflow system but
		 * it never hurts to generalize
		 */

		InvocationContext context = jobEvent.getContext();
		final ReferenceService refService = context.getReferenceService();

		InvocationStartedProvenanceItem invocationItem = null;
		ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
		if (provenanceReporter != null) {
			IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
			if (intermediateProvenance != null) {
				invocationItem = new InvocationStartedProvenanceItem();
				IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
				invocationItem.setIdentifier(UUID.randomUUID().toString());
				invocationItem.setActivity(activity);
				invocationItem.setProcessId(jobEvent.getOwningProcess());
				invocationItem.setInvocationProcessId(invocationProcessIdentifier);
				invocationItem.setParentId(parentItem.getIdentifier());
				invocationItem.setWorkflowId(parentItem.getWorkflowId());
				invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
				provenanceReporter.addProvenanceItem(invocationItem);
			}
		}

		/*
		 * Create a Map of EntityIdentifiers named appropriately given the
		 * activity mapping
		 */
		Map<String, T2Reference> inputData = new HashMap<>();
		for (String inputName : jobEvent.getData().keySet()) {
			String activityInputName = activity
					.getInputPortMapping().get(inputName);
			if (activityInputName != null)
				inputData.put(activityInputName, jobEvent.getData()
						.get(inputName));
		}

		/*
		 * Create a callback object to receive events, completions and failure
		 * notifications from the activity
		 */
		AsynchronousActivityCallback callback = new InvokeCallBack(
				jobEvent, refService, invocationProcessIdentifier,
				activity);

		if (activity instanceof MonitorableAsynchronousActivity<?>) {
			/*
			 * Monitorable activity so get the monitorable properties and push
			 * them into the state tree after launching the job
			 */
			MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
			Set<MonitorableProperty<?>> props = maa
					.executeAsynchWithMonitoring(inputData, callback);
			monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
		} else {
			/*
			 * Run the job, passing in the callback we've just created along
			 * with the (possibly renamed) input data map
			 */
			activity.executeAsynch(inputData, callback);
		}
	}