in taverna-dataflow-activity/src/main/java/org/apache/taverna/activities/dataflow/DataflowActivity.java [73:139]
public void executeAsynch(final Map<String, T2Reference> data,
final AsynchronousActivityCallback callback) {
callback.requestRun(new Runnable() {
Map<String, T2Reference> outputData = new HashMap<String, T2Reference>();
public void run() {
final WorkflowInstanceFacade facade;
try {
facade = getEdits().createWorkflowInstanceFacade(dataflow, callback.getContext(),
callback.getParentProcessIdentifier());
} catch (InvalidDataflowException ex) {
callback.fail("Invalid workflow", ex);
return;
}
final ResultListener rl = new ResultListener() {
public void resultTokenProduced(WorkflowDataToken dataToken, String port) {
if (dataToken.getIndex().length == 0) {
outputData.put(port, dataToken.getData());
}
}
};
final FacadeListener fl = new FacadeListener() {
@Override
public void workflowFailed(WorkflowInstanceFacade facade,
String message, Throwable t) {
callback.fail(message, t);
}
@Override
public void stateChange(WorkflowInstanceFacade facade,
State oldState, State newState) {
if (newState == State.completed) {
facade.removeResultListener(rl);
facade.removeFacadeListener(this);
callback.receiveResult(outputData, new int[]{});
}
}
};
facade.addResultListener(rl);
facade.addFacadeListener(fl);
facade.fire();
for (Map.Entry<String, T2Reference> entry : data.entrySet()) {
try {
WorkflowDataToken token = new WorkflowDataToken(callback
.getParentProcessIdentifier(), new int[] {}, entry.getValue(),
callback.getContext());
facade.pushData(token, entry.getKey());
} catch (TokenOrderException e) {
callback.fail("Failed to push data into facade", e);
}
}
}
});
}