in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/ProvenanceQuery.java [552:619]
public Map<String, Integer> getProcessorsIncomingLinks(String workflowId)
throws SQLException {
Map<String, Integer> result = new HashMap<>();
String currentWorkflowProcessor = null;
String sql = "SELECT processorName, firstActivityClass FROM Processor "
+ "WHERE workflowId = ?";
try (Connection c = getConnection()) {
try (PreparedStatement ps = c.prepareStatement(sql)) {
ps.setString(1, workflowId);
ResultSet rs = ps.executeQuery();
while (rs.next()) {
// PM CHECK 6/09
if (rs.getString("firstActivityClass").equals(
DATAFLOW_ACTIVITY)) {
currentWorkflowProcessor = rs
.getString("processorName");
logger.info("currentWorkflowProcessor = "
+ currentWorkflowProcessor);
}
result.put(rs.getString("processorName"), 0);
}
}
/*
* fetch the name of the top-level dataflow. We use this to exclude
* datalinks outgoing from its inputs
*/
// CHECK below -- gets confused on nested workflows
String parentWF = getParentOfWorkflow(workflowId);
if (parentWF == null)
parentWF = workflowId; // null parent means we are the top
logger.debug("parent WF: " + parentWF);
// get nested dataflows -- we want to avoid these in the toposort algorithm
List<ProvenanceProcessor> procs = getProcessorsShallow(c,
DATAFLOW_ACTIVITY, parentWF);
StringBuilder q = new StringBuilder("SELECT destinationProcessorName, count(*) AS cnt ");
q.append("FROM Datalink WHERE workflowId = \'").append(workflowId)
.append("\' AND destinationProcessorName NOT IN (");
String sep = "";
for (ProvenanceProcessor p : procs) {
q.append(sep).append("'").append(p.getProcessorName())
.append("'");
sep = ",";
}
q.append(") GROUP BY destinationProcessorName");
logger.info("executing \n" + q);
try (Statement stmt = c.createStatement();
ResultSet rs = stmt.executeQuery(q.toString())) {
while (rs.next())
if (!rs.getString("destinationProcessorName").equals(
currentWorkflowProcessor))
result.put(rs.getString("destinationProcessorName"),
rs.getInt("cnt"));
result.put(currentWorkflowProcessor, 0);
}
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
}
return result;
}