in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java [515:701]
public void processProcessEvent(ProvenanceItem provenanceItem, String currentWorkflowID) {
switch (provenanceItem.getEventType()) {
case PROCESS_EVENT_TYPE: {
String parentId = provenanceItem.getParentId(); // this is the workflowID
String identifier = provenanceItem.getIdentifier(); // use this as workflowRunId if this is the top-level process
parentChildMap.put(identifier, parentId);
ProcessorBinding pb = new ProcessorBinding();
pb.setWorkflowRunId(getWorkflowRunId());
pb.setWorkflowId(currentWorkflowID);
procBindingMap.put(identifier, pb);
return;
}
case PROCESSOR_EVENT_TYPE: {
String identifier = provenanceItem.getIdentifier();
String parentId = provenanceItem.getParentId();
String processID = provenanceItem.getProcessId(); // this is the external process ID
// this has the weird form facade0:dataflowname:pname need to extract pname from here
String[] processName = processID.split(":");
procBindingMap.get(parentId).setProcessorName(
processName[processName.length - 1]);
// 3rd component of composite name
parentChildMap.put(identifier, parentId);
return;
}
case ACTIVITY_EVENT_TYPE: {
String identifier = provenanceItem.getIdentifier();
String parentId = provenanceItem.getParentId();
procBindingMap.get(parentChildMap.get(parentId))
.setFirstActivityClassName(identifier);
parentChildMap.put(identifier, parentId);
return;
}
case ITERATION_EVENT_TYPE: {
IterationProvenanceItem iterationProvenanceItem = (IterationProvenanceItem)provenanceItem;
if (iterationProvenanceItem.getParentIterationItem() != null)
// Skipping pipelined outputs, we'll process the parent output later instead
return;
// traverse up to root to retrieve ProcBinding that was created when we saw the process event
String activityID = provenanceItem.getParentId();
String processorID = parentChildMap.get(activityID);
String processID = parentChildMap.get(processorID);
String iterationID = provenanceItem.getIdentifier();
parentChildMap.put(iterationID, activityID);
ProcessorEnactment processorEnactment = processorEnactmentMap
.get(iterationID);
if (processorEnactment == null)
processorEnactment = new ProcessorEnactment();
ProcessorBinding procBinding = procBindingMap.get(processID);
String itVector = extractIterationVector(iterationToString(iterationProvenanceItem
.getIteration()));
procBinding.setIterationVector(itVector);
processorEnactment.setEnactmentStarted(iterationProvenanceItem
.getEnactmentStarted());
processorEnactment.setEnactmentEnded(iterationProvenanceItem
.getEnactmentEnded());
processorEnactment.setWorkflowRunId(workflowRunId);
processorEnactment.setIteration(itVector);
String processId = iterationProvenanceItem.getProcessId();
String parentProcessId = parentProcess(processId, 3);
if (parentProcessId != null) {
ProcessorEnactment parentProcEnact = getWfdp().invocationProcessToProcessEnactment
.get(parentProcessId);
if (parentProcEnact != null)
processorEnactment
.setParentProcessorEnactmentId(parentProcEnact
.getProcessEnactmentId());
}
processorEnactment.setProcessEnactmentId(iterationProvenanceItem
.getIdentifier());
processorEnactment.setProcessIdentifier(processId);
ProvenanceProcessor provenanceProcessor;
if (processorEnactment.getProcessorId() == null) {
provenanceProcessor = pq.getProvenanceProcessorByName(
currentWorkflowID, procBinding.getProcessorName());
if (provenanceProcessor == null)
// already logged warning
return;
processorMapById.put(provenanceProcessor.getIdentifier(),
provenanceProcessor);
processorEnactment.setProcessorId(provenanceProcessor
.getIdentifier());
} else {
provenanceProcessor = processorMapById.get(processorEnactment
.getProcessorId());
if (provenanceProcessor == null) {
provenanceProcessor = pq
.getProvenanceProcessorById(processorEnactment
.getProcessorId());
processorMapById.put(provenanceProcessor.getIdentifier(),
provenanceProcessor);
}
}
InputDataProvenanceItem inputDataEl = iterationProvenanceItem.getInputDataItem();
OutputDataProvenanceItem outputDataEl = iterationProvenanceItem.getOutputDataItem();
if (inputDataEl != null
&& processorEnactment.getInitialInputsDataBindingId() == null) {
processorEnactment
.setInitialInputsDataBindingId(processDataBindings(
inputDataEl, provenanceProcessor));
processInput(inputDataEl, procBinding, currentWorkflowID);
}
if (outputDataEl != null
&& processorEnactment.getFinalOutputsDataBindingId() == null) {
processorEnactment
.setFinalOutputsDataBindingId(processDataBindings(
outputDataEl, provenanceProcessor));
processOutput(outputDataEl, procBinding, currentWorkflowID);
}
try {
if (processorEnactmentMap.containsKey(iterationID)) {
getPw().updateProcessorEnactment(processorEnactment);
} else {
getPw().addProcessorEnactment(processorEnactment);
processorEnactmentMap.put(iterationID, processorEnactment);
}
} catch (SQLException e) {
logger.warn("Could not store processor enactment", e);
}
return;
}
case END_WORKFLOW_EVENT_TYPE: {
DataflowRunComplete completeEvent = (DataflowRunComplete) provenanceItem;
// use this event to do housekeeping on the input/output varbindings
// process the input and output values accumulated by WorkflowDataProcessor
getWfdp().processTrees(completeEvent, getWorkflowRunId());
reconcileLocalOutputs(provenanceItem.getWorkflowId());
if (! provenanceItem.getProcessId().contains(":")) {
// Top-level workflow finished
// No longer needed, done by processTrees()
// patchTopLevelnputs();
workflowStructureDone = false; // CHECK reset for next run...
// reconcileTopLevelOutputs(); // Done by reconcileLocalOutputs
getPw().closeCurrentModel(); // only real impl is for RDF
}
return;
}
case WORKFLOW_DATA_EVENT_TYPE: {
// give this event to a WorkflowDataProcessor object for pre-processing
// try {
// TODO may generate an exception when the data is an error CHECK
getWfdp().addWorkflowDataItem(provenanceItem);
// } catch (NumberFormatException e) {
// logger.error(e);
// }
// logger.info("Received workflow data - not processing");
//FIXME not sure - needs to be stored somehow
return;
}
case INVOCATION_STARTED_EVENT_TYPE: {
InvocationStartedProvenanceItem startedItem = (InvocationStartedProvenanceItem) provenanceItem;
ProcessorEnactment processorEnactment = processorEnactmentMap
.get(startedItem.getParentId());
if (processorEnactment == null) {
logger.error("Could not find ProcessorEnactment for invocation "
+ startedItem);
return;
}
getWfdp().invocationProcessToProcessEnactment.put(
startedItem.getInvocationProcessId(), processorEnactment);
return;
}
case ERROR_EVENT_TYPE:
//TODO process the error
return;
default:
// TODO broken, should we throw something here?
return;
}
}