in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java [990:1051]
private void backpatchIterationResults(List<PortBinding> newBindings) throws SQLException {
logger.debug("backpatchIterationResults: start");
for (PortBinding vb : newBindings) {
logger.debug("backpatchIterationResults: processing vb "
+ vb.getProcessorName() + "/" + vb.getPortName() + "="
+ vb.getValue());
if (vb.getCollIDRef()!= null) // this is a member of a collection
logger.debug("...which is inside a collection ");
// look for its antecedent
Map<String,String> queryConstraints = new HashMap<>();
queryConstraints.put("destinationPortName", vb.getPortName());
queryConstraints.put("destinationProcessorName", vb.getProcessorName());
queryConstraints.put("workflowId", pq.getWorkflowIdsForRun(vb.getWorkflowRunId()).get(0)); // CHECK picking first element in list...
List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);
// there can be only one -- but check that there is one!
if (incomingDataLinks.isEmpty())
return;
String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
String sourceVname = incomingDataLinks.get(0).getSourcePortName();
logger.debug("antecedent: "+sourcePname+":"+sourceVname);
// get the varbindings for this port and select the one with the same iteration vector as its successor
queryConstraints.clear();
queryConstraints.put("VB.portName", sourceVname);
queryConstraints.put("V.processorName", sourcePname);
queryConstraints.put("VB.value", vb.getValue());
queryConstraints.put("VB.workflowRunId", vb.getWorkflowRunId());
// reconcile
for (PortBinding b : pq.getPortBindings(queryConstraints)) {
logger.debug("backpatching " + sourceVname + " " + sourcePname);
if (vb.getCollIDRef() != null && b.getCollIDRef() == null) {
logger.debug("successor " + vb.getPortName()
+ " is in collection " + vb.getCollIDRef()
+ " but pred " + b.getPortName() + " is not");
logger.debug("putting " + b.getPortName()
+ " in collection " + vb.getCollIDRef()
+ " at pos " + vb.getPositionInColl());
b.setCollIDRef(vb.getCollIDRef());
b.setPositionInColl(vb.getPositionInColl());
getPw().updatePortBinding(b);
} else if (vb.getCollIDRef() == null && b.getCollIDRef() != null) {
logger.debug("successor " + vb.getPortName()
+ " is NOT in collection but pred "
+ b.getPortName() + " IS");
logger.debug("putting " + vb.getPortName()
+ " in collection " + b.getCollIDRef() + " at pos "
+ b.getPositionInColl());
vb.setCollIDRef(b.getCollIDRef());
vb.setPositionInColl(b.getPositionInColl());
getPw().updatePortBinding(vb);
}
}
}
}