in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java [1412:1512]
public void propagateANL(String workflowRunId) throws SQLException {
String top = pq.getTopLevelDataflowName(workflowRunId);
// //////////////////////
// PHASE I: toposort the processors in the whole graph
// //////////////////////
List<Pair> sorted = toposort(top, workflowRunId);
List<String> sortedProcessors = new ArrayList<>();
for (Pair p : sorted)
sortedProcessors.add(p.getV1());
logger.debug("final sorted list of processors");
for (Pair p : sorted)
logger.debug(p.getV1() + " in workflowId " + p.getV2());
// //////////////////////
// PHASE II: traverse and set anl on each port
// //////////////////////
// // sorted processor names in L at this point
// // process them in order
for (Pair pnameInContext : sorted) {
// // process pname's inputs -- set ANL to be the DNL if not set in prior steps
String pname = pnameInContext.getV1();
String workflowId = pnameInContext.getV2();
List<Port> inputs = getPq().getInputPorts(pname, workflowId); // null -> do not use instance (??) CHECK
int totalANL = 0;
for (Port iv : inputs) {
if (! iv.isResolvedDepthSet()) {
iv.setResolvedDepth(iv.getDepth());
getPw().updatePort(iv);
}
int delta_nl = iv.getResolvedDepth() - iv.getDepth();
// if delta_nl < 0 then Taverna wraps the value into a list --> use dnl(X) in this case
if (delta_nl < 0 ) delta_nl = 0;// CHECK iv.getTypedepth();
totalANL += delta_nl;
// this should take care of the special case of the top level dataflow with inputs that have successors in the graph
// propagate this through all the links from this var
// List<Port> successors = getPq().getSuccVars(pname, iv.getVName(), workflowRunId);
// for (Port v : successors) {
// v.setresolvedDepth(iv.getresolvedDepth());
// getPw().updateVar(v);
// }
}
// process pname's outputs -- set ANL based on the sum formula (see
// paper)
for (Port ov : getPq().getOutputPorts(pname, workflowId)) {
ov.setResolvedDepth(ov.getDepth() + totalANL);
logger.debug("anl for "+pname+":"+ov.getPortName()+" = "+(ov.getDepth() + totalANL));
getPw().updatePort(ov);
// propagate this through all the links from this var
for (Port v : getPq().getSuccPorts(pname, ov.getPortName(), workflowId)) {
List<Port> toBeProcessed = new ArrayList<>();
toBeProcessed.add(v);
if (v.getProcessorId() == null && v.isInputPort()) { // this is the input to a nested workflow
// String tempWorkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
String tempWorkflowId = pq
.getWorkflowIdForExternalName(v
.getProcessorName());
List<Port> realSuccessors = getPq().getSuccPorts(
v.getProcessorName(), v.getPortName(),
tempWorkflowId);
toBeProcessed.remove(0);
toBeProcessed.addAll(realSuccessors);
} else if (v.getProcessorId() == null && !v.isInputPort()) { // this is the output to a nested workflow
// String tempworkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId);
List<Port> realSuccessors = getPq().getSuccPorts(
v.getProcessorName(), v.getPortName(), null);
toBeProcessed.remove(0);
toBeProcessed.addAll(realSuccessors);
}
for (Port v1 : toBeProcessed) {
v1.setResolvedDepth(ov.getResolvedDepth());
logger.debug("anl for " + v1.getProcessorName() + ":"
+ v1.getPortName() + " = "
+ ov.getResolvedDepth());
getPw().updatePort(v1);
}
}
}
}
}