in taverna-workflowmodel-extensions/src/main/java/org/apache/taverna/workflowmodel/processor/dispatch/layers/Invoke.java [128:206]
protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
// Register with the monitor
final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
getNextProcessID()).getOwningProcess();
monMan.registerNode(activity, invocationProcessIdentifier,
new HashSet<MonitorableProperty<?>>());
monMan.registerNode(jobEvent, invocationProcessIdentifier,
new HashSet<MonitorableProperty<?>>());
/*
* The activity is an AsynchronousActivity so we invoke it with an
* AsynchronousActivityCallback object containing appropriate callback
* methods to push results, completions and failures back to the
* invocation layer.
*
* Get the registered DataManager for this process. In most cases this
* will just be a single DataManager for the entire workflow system but
* it never hurts to generalize
*/
InvocationContext context = jobEvent.getContext();
final ReferenceService refService = context.getReferenceService();
InvocationStartedProvenanceItem invocationItem = null;
ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
if (provenanceReporter != null) {
IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
if (intermediateProvenance != null) {
invocationItem = new InvocationStartedProvenanceItem();
IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
invocationItem.setIdentifier(UUID.randomUUID().toString());
invocationItem.setActivity(activity);
invocationItem.setProcessId(jobEvent.getOwningProcess());
invocationItem.setInvocationProcessId(invocationProcessIdentifier);
invocationItem.setParentId(parentItem.getIdentifier());
invocationItem.setWorkflowId(parentItem.getWorkflowId());
invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
provenanceReporter.addProvenanceItem(invocationItem);
}
}
/*
* Create a Map of EntityIdentifiers named appropriately given the
* activity mapping
*/
Map<String, T2Reference> inputData = new HashMap<>();
for (String inputName : jobEvent.getData().keySet()) {
String activityInputName = activity
.getInputPortMapping().get(inputName);
if (activityInputName != null)
inputData.put(activityInputName, jobEvent.getData()
.get(inputName));
}
/*
* Create a callback object to receive events, completions and failure
* notifications from the activity
*/
AsynchronousActivityCallback callback = new InvokeCallBack(
jobEvent, refService, invocationProcessIdentifier,
activity);
if (activity instanceof MonitorableAsynchronousActivity<?>) {
/*
* Monitorable activity so get the monitorable properties and push
* them into the state tree after launching the job
*/
MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
Set<MonitorableProperty<?>> props = maa
.executeAsynchWithMonitoring(inputData, callback);
monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
} else {
/*
* Run the job, passing in the callback we've just created along
* with the (possibly renamed) input data map
*/
activity.executeAsynch(inputData, callback);
}
}