in taverna-execution-local/src/main/java/org/apache/taverna/platform/execution/impl/local/LocalExecutionMonitor.java [161:285]
public void registerNode(Object dataflowObject, String[] owningProcess,
Set<MonitorableProperty<?>> properties) {
if (dataflowObject instanceof Dataflow) {
Dataflow dataflow = (Dataflow) dataflowObject;
Invocation parentInvocation = invocations
.get(getParentInvocationId(owningProcess));
WorkflowReport report = (WorkflowReport) reports
.get(getReportId(owningProcess));
report.setStartedDate(new Date());
Invocation invocation = new Invocation(
getInvocationName(owningProcess), parentInvocation, report);
if (parentInvocation == null) {
if (DataBundles.hasInputs(dataBundle)) {
try {
invocation.setInputs(DataBundles.getPorts(DataBundles
.getInputs(dataBundle)));
} catch (IOException e) {
logger.log(WARNING, "Error setting input ports", e);
}
}
try {
Path outputs = DataBundles.getOutputs(dataBundle);
DataflowResultListener dataflowResultListener = new DataflowResultListener(
outputs);
for (DataflowOutputPort dataflowOutputPort : dataflow
.getOutputPorts()) {
String portName = dataflowOutputPort.getName();
Path portPath = DataBundles.getPort(outputs, portName);
invocation.setOutput(portName, portPath);
dataflowOutputPort
.addResultListener(dataflowResultListener);
}
} catch (IOException e) {
logger.log(WARNING, "Error setting output ports", e);
}
invocations.put(getInvocationId(owningProcess), invocation);
} else {
invocation.setInputs(parentInvocation.getInputs());
NestedDataflowResultListener resultListener = new NestedDataflowResultListener(
invocation);
for (DataflowOutputPort dataflowOutputPort : dataflow
.getOutputPorts()) {
dataflowOutputPort.addResultListener(resultListener);
}
invocations.put(getInvocationId(owningProcess), invocation);
}
} else if (dataflowObject instanceof Processor) {
StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
report.setStartedDate(new Date());
if (report instanceof LocalProcessorReport)
((LocalProcessorReport) report).addProperties(properties);
} else if (dataflowObject instanceof Activity) {
Activity<?> activity = (Activity<?>) dataflowObject;
invocationToActivity.put(owningProcess[owningProcess.length - 1],
String.valueOf(activity.hashCode()));
} else if (dataflowObject instanceof DispatchJobEvent) {
DispatchJobEvent jobEvent = (DispatchJobEvent) dataflowObject;
StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
// create a new invocation
Invocation parentInvocation;
Invocation invocation;
if (report instanceof ActivityReport) {
parentInvocation = invocations
.get(getParentInvocationId(owningProcess)
+ indexToString(jobEvent.getIndex()));
invocation = new Invocation(getInvocationName(owningProcess),
jobEvent.getIndex(), parentInvocation, report);
invocations.put(getInvocationId(owningProcess), invocation);
} else {
parentInvocation = invocations
.get(getParentInvocationId(owningProcess));
invocation = new Invocation(getInvocationName(owningProcess)
+ indexToString(jobEvent.getIndex()),
jobEvent.getIndex(), parentInvocation, report);
invocations.put(getInvocationId(owningProcess)
+ indexToString(jobEvent.getIndex()), invocation);
}
// set the invocation inputs
try {
for (Entry<String, T2Reference> inputInfo : jobEvent.getData()
.entrySet()) {
invocation.setInput(
inputInfo.getKey(),
getIntermediate(inputInfo.getValue(),
jobEvent.getContext()));
}
} catch (IOException | URISyntaxException e) {
logger.log(WARNING, "Error saving intermediate inputs for "
+ jobEvent.getOwningProcess(), e);
}
} else if (dataflowObject instanceof DispatchResultEvent) {
DispatchResultEvent resultEvent = (DispatchResultEvent) dataflowObject;
StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
// find the invocation
Invocation invocation;
if (report instanceof ActivityReport)
invocation = invocations.remove(getInvocationId(owningProcess));
else
invocation = invocations.remove(getInvocationId(owningProcess)
+ indexToString(resultEvent.getIndex()));
if (invocation == null) {
logger.log(SEVERE, "Can't find invocation for owning process "
+ owningProcess);
return;
}
// set the invocation outputs
try {
for (Entry<String, T2Reference> outputInfo : resultEvent.getData()
.entrySet()) {
invocation.setOutput(
outputInfo.getKey(),
getIntermediate(outputInfo.getValue(),
resultEvent.getContext()));
}
} catch (IOException | URISyntaxException e) {
logger.log(WARNING, "Error saving intermediate outputs for "
+ resultEvent.getOwningProcess(), e);
}
invocation.setCompletedDate(new Date());
}
}