in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/WorkflowDataProcessor.java [117:264]
public void processTrees(DataflowRunComplete completeEvent,
String workflowRunId) {
String workflowId = completeEvent.getWorkflowId();
logger.debug("processing output trees");
// i:inputPortName -> t2Ref
Map<String, String> workflowPortData = new HashMap<>();
for (Map.Entry<String, List<WorkflowDataNode>> entry : workflowDataTrees
.entrySet()) {
String portName = entry.getKey();
List<WorkflowDataNode> tree = entry.getValue();
PortBinding vb = null;
logger.debug("storing tree for var "+portName+" in workflow with ID "+workflowId+" and instance "+workflowRunId);
for (WorkflowDataNode node:tree) {
try {
if (!node.getWorkflowID().equals(workflowId))
continue;
if (node.getIndex().equals("[]")) {
// Store top-level workflow inputs/outputs
if (! node.getProcessId().equals(completeEvent.getProcessId()))
//logger.warn("Unexpected process ID " + node.getProcessId() + " expected " + completeEvent.getProcessId());
continue;
String portKey = (node.isInputPort() ? "/i:" : "/o:") + node.getPortName();
workflowPortData.put(portKey, node.getValue());
}
if (node.isList) {
logger.debug("creating collection entry for "
+ node.value + " with index " + node.index);
if (node.getParent() != null) {
logger.debug(" and parent " + node.parent.index);
// write a collection record to DB
getPw().addCollection(workflowId, node.getValue(),
node.getParent().getValue(),
node.getIndex(), portName, workflowRunId);
} else
getPw().addCollection(workflowId, node.getValue(),
null, node.getIndex(), portName,
workflowRunId);
} else {
logger.debug("creating PortBinding for " + node.value
+ " with index " + node.index);
vb = new PortBinding();
vb.setWorkflowId(workflowId);
vb.setWorkflowRunId(workflowRunId);
vb.setProcessorName(pq.getWorkflow(workflowId)
.getExternalName());
// vb.setValueType(); // TODO not sure what to set this to
vb.setPortName(portName);
vb.setIteration(node.getIndex());
vb.setValue(node.getValue());
if (node.getParent() != null) {
logger.debug(" in collection "
+ node.getParent().value + " with index "
+ node.getParent().getIndex());
vb.setCollIDRef(node.getParent().getValue());
vb.setPositionInColl(node.getRelativePosition());
} else
vb.setPositionInColl(1); // default
try {
getPw().addPortBinding(vb);
} catch (SQLException e) {
logger.debug("Problem processing trees for workflow: "
+ workflowId
+ " instance: "
+ workflowRunId
+ " : updating instead of inserting");
getPw().updatePortBinding(vb);
}
}
} catch (SQLException e) {
logger.debug(
"Database problem processing trees for workflow: "
+ workflowId + " instance: "
+ workflowRunId + " : " + workflowId, e);
}
}
}
List<Port> ports = getPq().getPortsForDataflow(workflowId);
String processId = completeEvent.getProcessId();
DataflowInvocation invocation = new DataflowInvocation();
invocation.setDataflowInvocationId(UUID.randomUUID().toString());
invocation.setWorkflowId(workflowId);
invocation.setWorkflowRunId(workflowRunId);
String parentProcessId = parentProcess(processId, 1);
if (parentProcessId != null) {
ProcessorEnactment procAct = invocationProcessToProcessEnactment
.get(parentProcessId);
if (procAct != null)
invocation.setParentProcessorEnactmentId(procAct
.getProcessEnactmentId());
}
invocation.setInvocationStarted(workflowStarted.get(completeEvent
.getParentId()));
invocation.setInvocationEnded(completeEvent.getInvocationEnded());
invocation.setCompleted(completeEvent.getState()
.equals(State.completed));
// Register data
String inputsDataBindingId = UUID.randomUUID().toString();
String outputsDataBindingId = UUID.randomUUID().toString();
for (Port port : ports) {
String portKey = (port.isInputPort() ? "/i:" : "/o:")
+ port.getPortName();
String t2Reference = workflowPortData.get(portKey);
if (t2Reference == null) {
logger.warn("No workflow port data for " + portKey);
continue;
}
DataBinding dataBinding = new DataBinding();
dataBinding
.setDataBindingId(port.isInputPort() ? inputsDataBindingId
: outputsDataBindingId);
dataBinding.setPort(port);
dataBinding.setT2Reference(t2Reference);
dataBinding.setWorkflowRunId(workflowRunId);
try {
pw.addDataBinding(dataBinding);
} catch (SQLException e) {
logger.warn("Could not add databinding for " + portKey, e);
}
}
invocation.setInputsDataBindingId(inputsDataBindingId);
invocation.setOutputsDataBindingId(outputsDataBindingId);
try {
pw.addDataflowInvocation(invocation);
} catch (SQLException e) {
logger.warn("Could not store dataflow invocation for " + processId,
e);
}
}