in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java [272:488]
private String processDataflowStructure(Dataflow df, String dataflowID, String externalName) {
String localWorkflowRunID = getWorkflowRunId();
//dataflowDepth++;
try {
// check whether we already have this WF in the DB
boolean alreadyInDb;
try {
List<String> workflowIds = pq.getAllworkflowIds();
alreadyInDb = workflowIds != null && workflowIds.contains(dataflowID);
} catch (SQLException e) {
logger.warn("Problem processing dataflow structure for " + dataflowID, e);
alreadyInDb = false;
}
// add workflow ID -- this is NOT THE SAME AS the workflowRunId
/*
* this could be a nested workflow -- in this case, override its
* workflowRunId with that of its parent
*/
if (!alreadyInDb) {
String parentDataflow = wfNestingMap.get(dataflowID);
Blob blob = serialize(df);
if (parentDataflow == null) {
// this is a top level dataflow description
pw.addWFId(dataflowID, null, externalName, blob); // set its dataflowID with no parent
} else {
// we are processing a nested workflow structure
logger.debug("dataflow "+dataflowID+" with external name "+externalName+" is nested within "+parentDataflow);
pw.addWFId(dataflowID, parentDataflow, externalName, blob); // set its dataflowID along with its parent
// override workflowRunId to point to top level -- UNCOMMENTED PM 9/09 CHECK
localWorkflowRunID = pq.getRuns(parentDataflow, null).get(0).getWorkflowRunId();
}
}
// Log the run itself
pw.addWorkflowRun(dataflowID, localWorkflowRunID);
// add processors along with their variables
List<Port> vars = new ArrayList<Port>();
for (Processor p : df.getProcessors()) {
String pName = p.getLocalName();
//CHECK get type of first activity and set this as the type of the processor itself
List<? extends Activity<?>> activities = p.getActivityList();
if (! alreadyInDb) {
ProvenanceProcessor provProc;
String pType = null;
if (activities != null && !activities.isEmpty())
pType = activities.get(0).getClass().getCanonicalName();
provProc = new ProvenanceProcessor();
provProc.setIdentifier(UUID.randomUUID().toString());
provProc.setProcessorName(pName);
provProc.setFirstActivityClassName(pType);
provProc.setWorkflowId(dataflowID);
provProc.setTopLevelProcessor(false);
pw.addProcessor(provProc);
//pw.addProcessor(pName, pType, dataflowID, false); // false: not a top level processor
/*
* add all input ports for this processor as input variables
*/
for (ProcessorInputPort ip : p.getInputPorts()) {
Port inputVar = new Port();
inputVar.setIdentifier(UUID.randomUUID().toString());
inputVar.setProcessorId(provProc.getIdentifier());
inputVar.setProcessorName(pName);
inputVar.setWorkflowId(dataflowID);
inputVar.setPortName(ip.getName());
inputVar.setDepth(ip.getDepth());
inputVar.setInputPort(true);
vars.add(inputVar);
}
/*
* add all output ports for this processor as output
* variables
*/
for (ProcessorOutputPort op : p.getOutputPorts()) {
Port outputVar = new Port();
outputVar.setIdentifier(UUID.randomUUID().toString());
outputVar.setProcessorName(pName);
outputVar.setProcessorId(provProc.getIdentifier());
outputVar.setWorkflowId(dataflowID);
outputVar.setPortName(op.getName());
outputVar.setDepth(op.getDepth());
outputVar.setInputPort(false);
vars.add(outputVar);
}
}
/*
* check for nested structures: if the activity is
* DataflowActivity then this processor is a nested workflow;
* make an entry into wfNesting map with its ID and recurse on
* the nested workflow
*/
if (activities != null)
for (Activity<?> a : activities) {
if (!(a instanceof NestedDataflow))
continue;
Dataflow nested = ((NestedDataflow) a)
.getNestedDataflow();
wfNestingMap.put(nested.getIdentifier(), dataflowID); // child -> parent
// RECURSIVE CALL
processDataflowStructure(nested,
nested.getIdentifier(), p.getLocalName());
}
} // end for each processor
// add inputs to entire dataflow
String pName = INPUT_CONTAINER_PROCESSOR; // overridden -- see below
/*
* check whether we are processing a nested workflow. in this case
* the input vars are not assigned to the INPUT processor but to the
* containing dataflow
*/
if (! alreadyInDb) {
if (externalName != null) // override the default if we are nested or someone external name is provided
pName = externalName;
for (DataflowInputPort ip : df.getInputPorts()) {
Port inputVar = new Port();
inputVar.setIdentifier(UUID.randomUUID().toString());
inputVar.setProcessorId(null); // meaning workflow port
inputVar.setProcessorName(pName);
inputVar.setWorkflowId(dataflowID);
inputVar.setPortName(ip.getName());
inputVar.setDepth(ip.getDepth());
inputVar.setInputPort(true); // CHECK PM modified 11/08 -- input vars are actually outputs of input processors...
vars.add(inputVar);
}
// add outputs of entire dataflow
pName = OUTPUT_CONTAINER_PROCESSOR; // overridden -- see below
/*
* check whether we are processing a nested workflow. in this
* case the output vars are not assigned to the OUTPUT processor
* but to the containing dataflow
*/
if (externalName != null) // we are nested
pName = externalName;
for (DataflowOutputPort op : df.getOutputPorts()) {
Port outputVar = new Port();
outputVar.setIdentifier(UUID.randomUUID().toString());
outputVar.setProcessorId(null); // meaning workflow port
outputVar.setProcessorName(pName);
outputVar.setWorkflowId(dataflowID);
outputVar.setPortName(op.getName());
outputVar.setDepth(op.getDepth());
outputVar.setInputPort(false); // CHECK PM modified 11/08 -- output vars are actually outputs of output processors...
vars.add(outputVar);
}
pw.addPorts(vars, dataflowID);
makePortMapping(vars);
/*
* add datalink records using the dataflow links retrieving the
* processor names requires navigating from links to source/sink
* and from there to the processors
*/
for (Datalink l : df.getLinks()) {
// TODO cover the case of datalinks from an input and to an output to the entire dataflow
Port sourcePort = null;
Port destinationPort = null;
OutputPort source = l.getSource();
if (source instanceof ProcessorOutputPort) {
String sourcePname = ((ProcessorOutputPort) source)
.getProcessor().getLocalName();
sourcePort = lookupPort(sourcePname, source.getName(), false);
} else if (source instanceof MergeOutputPort) {
// TODO: Handle merge output ports
} else
// Assume it is internal port from DataflowInputPort
sourcePort = lookupPort(externalName, source.getName(), true);
InputPort sink = l.getSink();
if (sink instanceof ProcessorInputPort) {
String sinkPname = ((ProcessorInputPort) sink)
.getProcessor().getLocalName();
destinationPort = lookupPort(sinkPname, sink.getName(), true);
} else if (sink instanceof MergeInputPort) {
// TODO: Handle merge input ports
} else
// Assume it is internal port from DataflowOutputPort
destinationPort = lookupPort(externalName, sink.getName(), false);
if (sourcePort != null && destinationPort != null)
pw.addDataLink(sourcePort, destinationPort, dataflowID);
else
logger.info("Can't record datalink " + l);
}
}
} catch (Exception e) {
logger.error("Problem processing provenance for dataflow", e);
}
return dataflowID;
}