in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/ProvenanceAnalysis.java [474:802]
private void xformStep(
String workflowRunId,
String workflowId,
Port outputVar, // we need the dnl from this output var
String proc, String path,
List<ProvenanceProcessor> selectedProcessors,
List<LineageSQLQuery> lqList) throws SQLException {
// retrieve input vars for current processor
Map<String, String> varsQueryConstraints = new HashMap<>();
List<Port> inputVars = null;
/*
* here we fetch the input vars for the current proc. however, it may be
* the case that we are looking at a dataflow port (for the entire
* dataflow or for a subdataflow) rather than a real processor. in this
* case we treat this as a special processor that does nothing -- so we
* "input var" in this case is a copy of the port, and we are ready to
* go for the next xfer step. in this way we can seamlessly traverse the
* graph over intermediate I/O that are part of nested dataflows
*/
if (getPq().isDataflow(proc)) { // if we are looking at the output of an entire dataflow
// force the "input vars" for this step to be the output var itself
// this causes the following xfer step to trace back to the next processor _within_ proc
inputVars = new ArrayList<>();
inputVars.add(outputVar);
} else if (proc.equals(OUTPUT_CONTAINER_PROCESSOR)) { // same action as prev case, but may change in the future
inputVars = new ArrayList<>();
inputVars.add(outputVar);
} else {
varsQueryConstraints.put("W.workflowId", workflowId);
varsQueryConstraints.put("processorName", proc);
varsQueryConstraints.put("isInputPort", "1");
inputVars = getPq().getPorts(varsQueryConstraints);
}
///////////
/// path projections
///////////
// maps each var to its projected path
Map<Port,String> var2Path = new HashMap<>();
Map<Port,Integer> var2delta = new HashMap<>();
if (path == null) { // nothing to split
for (Port inputVar : inputVars)
var2Path.put(inputVar, null);
} else {
int minPathLength = 0; // if input path is shorter than this we give up granularity altogether
for (Port inputVar : inputVars) {
int resolvedDepth = 0;
if (inputVar.getResolvedDepth() != null)
resolvedDepth = inputVar.getResolvedDepth();
int delta = resolvedDepth - inputVar.getDepth();
var2delta.put(inputVar, delta);
minPathLength += delta;
}
String iterationVector[] = path.split(",");
if (iterationVector.length < minPathLength) { // no path is propagated
for (Port inputVar: inputVars)
var2Path.put(inputVar, null);
} else { // compute projected paths
String[] projectedPath;
int start = 0;
for (Port inputVar: inputVars) {
// 24/7/08 get DNL (declared nesting level) and ANL (actual nesting level) from VAR
// TODO account for empty paths
int projectedPathLength = var2delta.get(inputVar); // this is delta
if (projectedPathLength == 0) {
// associate empty path to this var
var2Path.put(inputVar, null);
continue;
}
// this var is involved in iteration
projectedPath = new String[projectedPathLength];
for (int i = 0; i < projectedPathLength; i++)
projectedPath[i] = iterationVector[start + i];
start += projectedPathLength;
StringBuilder iterationFragment = new StringBuilder();
for (String s : projectedPath)
iterationFragment.append(s + ",");
iterationFragment
.deleteCharAt(iterationFragment.length() - 1);
var2Path.put(inputVar, iterationFragment.toString());
}
}
}
// accumulate this proc to current path
currentPath.add(proc);
/*
* if this is a selected processor, add a copy of the current path to
* the list of paths for the processor
*/
// is <workflowId, proc> in selectedProcessors?
boolean isSelected = false;
for (ProvenanceProcessor pp : selectedProcessors)
if (pp.getWorkflowId().equals(workflowId)
&& pp.getProcessorName().equals(proc)) {
List<List<String>> paths = validPaths.get(pp);
// copy the path since the original will change
// also remove spurious dataflow processors at this point
List<String> pathCopy = new ArrayList<>();
for (String s : currentPath)
if (!getPq().isDataflow(s))
pathCopy.add(s);
paths.add(pathCopy);
isSelected = true;
break;
}
///////////
/// generate SQL if necessary -- for all input vars, based on the current path
/// the projected paths are required to determine the level in the collection at which
/// we look at the value assignment
///////////
Map<String, ProvenanceArtifact> var2Artifact = new HashMap<>();
Map<String, ProvenanceRole> var2ArtifactRole = new HashMap<>();
// if this transformation is important to the user, produce an output and also an OPM graph fragment
if (selectedProcessors.isEmpty() || isSelected) {
List<LineageSQLQuery> newLqList = getPq().lineageQueryGen(
workflowRunId, proc, var2Path, outputVar, path,
isReturnOutputs() || var2Path.isEmpty());
lqList.addAll(newLqList);
// BEGIN OPM update section
//
// create OPM artifact and role for the output var of this xform
//
boolean doOPM = (aOPMManager != null && aOPMManager.isActive()); // any problem below will set this to false
if (doOPM) {
// fetch value for this variable and assert it as an Artifact in the OPM graph
Map<String, String> vbConstraints = new HashMap<>();
vbConstraints.put("VB.processorNameRef",
outputVar.getProcessorName());
vbConstraints.put("VB.portName", outputVar.getPortName());
vbConstraints.put("VB.workflowRunId", workflowRunId);
if (path != null) {
/*
* account for x,y,.. format as well as [x,y,...] depending
* on where the request is coming from
*/
// TODO this is just irritating must be removed
if (path.startsWith("["))
vbConstraints.put("VB.iteration", path);
else
vbConstraints.put("VB.iteration", "[" + path + "]");
}
List<PortBinding> vbList = getPq().getPortBindings(vbConstraints); // DB
/*
* use only the first result (expect only one) -- in this method
* we assume path is not null
*/
// map the resulting varBinding to an Artifact
if (vbList == null || vbList.size() == 0) {
logger.debug("no entry corresponding to conditions: proc="
+ outputVar.getProcessorName() + " var = "
+ outputVar.getPortName() + " iteration = " + path);
doOPM = false;
} else {
PortBinding vb = vbList.get(0);
if (aOPMManager != null && !pq.isDataflow(proc)) {
if (isRecordArtifactValues()) {
T2Reference ref = getInvocationContext()
.getReferenceService().referenceFromString(
vb.getValue());
Object data = ic.getReferenceService()
.renderIdentifier(ref, Object.class, ic);
// ReferenceSetImpl o = (ReferenceSetImpl) ic.getReferenceService().resolveIdentifier(ref, null, ic);
logger.debug("deref value for ref: " + ref + " "
+ data + " of class "
+ data.getClass().getName());
try {
aOPMManager.addArtifact(vb.getValue(), data);
} catch (ProvenanceException e) {
logger.warn("Could not add artifact", e);
}
} else {
try {
aOPMManager.addArtifact(vb.getValue());
} catch (ProvenanceException e) {
logger.warn("Could not add artifact", e);
}
}
aOPMManager.createRole(vb.getWorkflowRunId(),
vb.getWorkflowId(), vb.getProcessorName(),
vb.getIteration());
}
/*
* assert proc as Process -- include iteration vector to
* separate different activations of the same process
*/
try {
aOPMManager.addProcess(proc, vb.getIteration(),
workflowId, vb.getWorkflowRunId());
} catch (ProvenanceException e) {
logger.warn("Could not add process", e);
}
/*
* create OPM generatedBy property between output value and
* this process node avoid the pathological case where a
* dataflow generates its own inputs
*/
try {
aOPMManager.assertGeneratedBy(
aOPMManager.getCurrentArtifact(),
aOPMManager.getCurrentProcess(),
aOPMManager.getCurrentRole(),
aOPMManager.getCurrentAccount(), true);
} catch (ProvenanceException e) {
logger.warn("Could not add assertion", e);
}
}
}
//
// create OPM process for this xform
//
for (LineageSQLQuery lq : newLqList) {
// if OPM is on, execute the query so we get the value we need for the Artifact node
Dependencies inputs = getPq().runLineageQuery(lq,
isIncludeDataValue());
if (doOPM && inputs.getRecords().size() > 0) { // && !pq.isDataflow(proc)) {
// update OPM graph with inputs and used properties
for (LineageQueryResultRecord resultRecord: inputs.getRecords()) {
// process inputs only
if (!resultRecord.isInputPort())
continue;
// map each input var in the resultRecord to an Artifact
// create new Resource for the resultRecord
// use the value as URI for the Artifact, and resolvedValue as the actual value
//
// create OPM artifact and role for the input var obtained by path projection
//
if (resultRecord.isCollection()) {
try {
aOPMManager.addArtifact(resultRecord
.getCollectionT2Reference());
} catch (ProvenanceException e) {
logger.warn("Could not add artifact", e);
}
} else if (isRecordArtifactValues()) {
T2Reference ref = getInvocationContext()
.getReferenceService().referenceFromString(
resultRecord.getValue());
Object data = ic.getReferenceService()
.renderIdentifier(ref, Object.class, ic);
logger.debug("deref value for ref: " + ref + " "
+ data + " of class "
+ data.getClass().getName());
try {
aOPMManager.addArtifact(
resultRecord.getValue(), data);
} catch (ProvenanceException e) {
logger.warn("Could not add artifact", e);
}
} else {
try {
aOPMManager
.addArtifact(resultRecord.getValue());
} catch (ProvenanceException e) {
logger.warn("Could not add artifact", e);
}
var2Artifact.put(resultRecord.getPortName(),
aOPMManager.getCurrentArtifact());
aOPMManager.createRole(
resultRecord.getWorkflowRunId(),
resultRecord.getworkflowId(),
resultRecord.getProcessorName(),
resultRecord.getIteration());
var2ArtifactRole.put(resultRecord.getPortName(),
aOPMManager.getCurrentRole());
//
// create OPM used property between process and the input var obtained by path projection
//
// avoid output variables, it would assert that P used one of its outputs!
try {
aOPMManager.assertUsed(
aOPMManager.getCurrentArtifact(),
aOPMManager.getCurrentProcess(),
aOPMManager.getCurrentRole(),
aOPMManager.getCurrentAccount(), true);
} catch (ProvenanceException e) {
logger.warn("Could not add artifact", e);
}
// true -> prevent duplicates CHECK
}
}
}
// END OPM update section
}
// recursion -- xfer path is next up
for (Port inputVar : inputVars)
xferStep(workflowRunId, workflowId, inputVar,
var2Path.get(inputVar), selectedProcessors, lqList);
}
currentPath.remove(currentPath.size()-1); // CHECK
} // end xformStep