in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java [835:956]
public void reconcileLocalOutputs(String dataflowID) {
/*
for each output O
for each variable V in predecessors(O)
fetch all VB records for O into list OValues
fetch all VB records for V into list Yalues
compare OValues and VValues:
it SHOULD be the case that OValues is a subset of YValues. Under this assumption:
for each vb in YValues:
- if there is a matching o in OValues then (vb may be missing collection information)
copy o to vb
else
if vb has no collection info && there is a matching tree node tn in OTree (use iteration index for the match) then
set vb to be in collection tb
copy vb to o
finally copy all Collection records for O in OTree -- catch duplicate errors
*/
Map<String, String> queryConstraints = new HashMap<>();
try {
// for each output O
for (Port output:pq.getOutputPorts(topLevelDataflowName, topLevelDataflowID)) {
// collect all VBs for O
// String oPName = output.getPName();
// String oVName = output.getVName();
// queryConstraints.put("V.portName", oVName);
// queryConstraints.put("V.processorName", oPName);
// queryConstraints.put("VB.workflowRunId", workflowRunId);
// queryConstraints.put("V.workflowId", topLevelDataflowID);
// List<PortBinding> OValues = pq.getPortBindings(queryConstraints);
// find all records for the immediate precedessor Y of O
queryConstraints.clear();
// queryConstraints.put("destinationPortName", output.getVName());
// queryConstraints.put("destinationProcessorName", output.getPName());
queryConstraints.put("destinationPortId", output.getIdentifier());
queryConstraints.put("workflowId", output.getWorkflowId());
List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);
// there can be only one -- but check that there is one!
if (incomingDataLinks.isEmpty())
continue;
String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
String sourceVname = incomingDataLinks.get(0).getSourcePortName();
queryConstraints.clear();
queryConstraints.put("V.portName", sourceVname);
queryConstraints.put("V.processorName", sourcePname);
queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
queryConstraints.put("V.workflowId", topLevelDataflowID);
List<PortBinding> YValues = pq.getPortBindings(queryConstraints);
// for each YValue look for a match in OValues
// (assume the YValues values are a superset of OValues)!)
for (PortBinding yValue:YValues) {
// look for a matching record in PortBinding for output O
queryConstraints.clear();
queryConstraints.put("V.portName", output.getPortName());
queryConstraints.put("V.processorName", output.getProcessorName());
queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
queryConstraints.put("V.workflowid", topLevelDataflowID);
queryConstraints.put("VB.iteration", yValue.getIteration());
if (yValue.getCollIDRef()!= null) {
queryConstraints.put("VB.collIDRef", yValue.getCollIDRef());
queryConstraints.put("VB.positionInColl", Integer.toString(yValue.getPositionInColl()));
}
List<PortBinding> matchingOValues = pq.getPortBindings(queryConstraints);
// result at most size 1
if (!matchingOValues.isEmpty()) {
PortBinding oValue = matchingOValues.get(0);
// copy collection info from oValue to yValue
yValue.setCollIDRef(oValue.getCollIDRef());
yValue.setPositionInColl(oValue.getPositionInColl());
pw.updatePortBinding(yValue);
} else {
// copy the yValue to O
// insert PortBinding back into VB with the global output portName
yValue.setProcessorName(output.getProcessorName());
yValue.setPortName(output.getPortName());
pw.addPortBinding(yValue);
}
} // for each yValue in YValues
// copy all Collection records for O to Y
// get all collections refs for O
queryConstraints.clear();
queryConstraints.put("workflowRunId", getWorkflowRunId());
queryConstraints.put("processorNameRef", output.getProcessorName());
queryConstraints.put("portName", output.getPortName());
List<NestedListNode> oCollections = pq.getNestedListNodes(queryConstraints);
// insert back as collection refs for Y -- catch duplicates
for (NestedListNode nln:oCollections) {
nln.setProcessorName(sourcePname);
nln.setProcessorName(sourceVname);
getPw().replaceCollectionRecord(nln, sourcePname, sourceVname);
}
} // for each output var
} catch (SQLException e) {
logger.warn("Problem reconciling top level outputs", e);
}
}