public void processProcessEvent()

in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java [515:701]


	public void processProcessEvent(ProvenanceItem provenanceItem, String currentWorkflowID) {
		switch (provenanceItem.getEventType()) {
		case PROCESS_EVENT_TYPE: {
			String parentId = provenanceItem.getParentId();  // this is the workflowID
			String identifier = provenanceItem.getIdentifier();  // use this as workflowRunId if this is the top-level process

			parentChildMap.put(identifier, parentId);
			ProcessorBinding pb = new ProcessorBinding();
			pb.setWorkflowRunId(getWorkflowRunId());
			pb.setWorkflowId(currentWorkflowID);
			procBindingMap.put(identifier, pb);
			return;
		}
		case PROCESSOR_EVENT_TYPE: {
			String identifier = provenanceItem.getIdentifier();
			String parentId = provenanceItem.getParentId();
			String processID = provenanceItem.getProcessId(); // this is the external process ID

			// this has the weird form facade0:dataflowname:pname  need to extract pname from here
			String[] processName = processID.split(":");
			procBindingMap.get(parentId).setProcessorName(
					processName[processName.length - 1]);
			// 3rd component of composite name

			parentChildMap.put(identifier, parentId);
			return;
		}
		case ACTIVITY_EVENT_TYPE: {
			String identifier = provenanceItem.getIdentifier();
			String parentId = provenanceItem.getParentId();
			procBindingMap.get(parentChildMap.get(parentId))
					.setFirstActivityClassName(identifier);
			parentChildMap.put(identifier, parentId);
			return;
		}
		case ITERATION_EVENT_TYPE: {
			IterationProvenanceItem iterationProvenanceItem = (IterationProvenanceItem)provenanceItem;
			if (iterationProvenanceItem.getParentIterationItem() != null)
				// Skipping pipelined outputs, we'll process the parent output later instead
				return;

			// traverse up to root to retrieve ProcBinding that was created when we saw the process event
			String activityID = provenanceItem.getParentId();
			String processorID = parentChildMap.get(activityID);
			String processID = parentChildMap.get(processorID);
			String iterationID = provenanceItem.getIdentifier();
			parentChildMap.put(iterationID, activityID);

			ProcessorEnactment processorEnactment = processorEnactmentMap
					.get(iterationID);
			if (processorEnactment == null)
				processorEnactment = new ProcessorEnactment();

			ProcessorBinding procBinding = procBindingMap.get(processID);

			String itVector = extractIterationVector(iterationToString(iterationProvenanceItem
					.getIteration()));
			procBinding.setIterationVector(itVector);

			processorEnactment.setEnactmentStarted(iterationProvenanceItem
					.getEnactmentStarted());
			processorEnactment.setEnactmentEnded(iterationProvenanceItem
					.getEnactmentEnded());
			processorEnactment.setWorkflowRunId(workflowRunId);
			processorEnactment.setIteration(itVector);

			String processId = iterationProvenanceItem.getProcessId();
			String parentProcessId = parentProcess(processId, 3);
			if (parentProcessId != null) {
				ProcessorEnactment parentProcEnact = getWfdp().invocationProcessToProcessEnactment
						.get(parentProcessId);
				if (parentProcEnact != null)
					processorEnactment
							.setParentProcessorEnactmentId(parentProcEnact
									.getProcessEnactmentId());
			}
			processorEnactment.setProcessEnactmentId(iterationProvenanceItem
					.getIdentifier());
			processorEnactment.setProcessIdentifier(processId);

			ProvenanceProcessor provenanceProcessor;
			if (processorEnactment.getProcessorId() == null) {
				provenanceProcessor = pq.getProvenanceProcessorByName(
						currentWorkflowID, procBinding.getProcessorName());
				if (provenanceProcessor == null)
					// already logged warning
					return;
				processorMapById.put(provenanceProcessor.getIdentifier(),
						provenanceProcessor);
				processorEnactment.setProcessorId(provenanceProcessor
						.getIdentifier());
			} else {
				provenanceProcessor = processorMapById.get(processorEnactment
						.getProcessorId());
				if (provenanceProcessor == null) {
					provenanceProcessor = pq
							.getProvenanceProcessorById(processorEnactment
									.getProcessorId());
					processorMapById.put(provenanceProcessor.getIdentifier(),
							provenanceProcessor);
				}
			}

			InputDataProvenanceItem inputDataEl = iterationProvenanceItem.getInputDataItem();
			OutputDataProvenanceItem outputDataEl = iterationProvenanceItem.getOutputDataItem();

			if (inputDataEl != null
					&& processorEnactment.getInitialInputsDataBindingId() == null) {
				processorEnactment
						.setInitialInputsDataBindingId(processDataBindings(
								inputDataEl, provenanceProcessor));
				processInput(inputDataEl, procBinding, currentWorkflowID);
			}

			if (outputDataEl != null
					&& processorEnactment.getFinalOutputsDataBindingId() == null) {
				processorEnactment
						.setFinalOutputsDataBindingId(processDataBindings(
								outputDataEl, provenanceProcessor));
				processOutput(outputDataEl, procBinding, currentWorkflowID);
			}

			try {
				if (processorEnactmentMap.containsKey(iterationID)) {
					getPw().updateProcessorEnactment(processorEnactment);
				} else {
					getPw().addProcessorEnactment(processorEnactment);
					processorEnactmentMap.put(iterationID, processorEnactment);
				}
			} catch (SQLException e) {
				logger.warn("Could not store processor enactment", e);
			}
			return;
		}
		case END_WORKFLOW_EVENT_TYPE: {
			DataflowRunComplete completeEvent = (DataflowRunComplete) provenanceItem;
			// use this event to do housekeeping on the input/output varbindings

			// process the input and output values accumulated by WorkflowDataProcessor
			getWfdp().processTrees(completeEvent, getWorkflowRunId());

			reconcileLocalOutputs(provenanceItem.getWorkflowId());

			if (! provenanceItem.getProcessId().contains(":")) {
				// Top-level workflow finished
				// No longer needed, done by processTrees()
//				patchTopLevelnputs();

				workflowStructureDone = false; // CHECK reset for next run...
//				reconcileTopLevelOutputs(); // Done by reconcileLocalOutputs
				getPw().closeCurrentModel();  // only real impl is for RDF
			}
			return;
		}
		case WORKFLOW_DATA_EVENT_TYPE: {
			// give this event to a WorkflowDataProcessor object for pre-processing
			//			try {
			// TODO may generate an exception when the data is an error CHECK
			getWfdp().addWorkflowDataItem(provenanceItem);
			//			} catch (NumberFormatException e) {
			//			logger.error(e);
			//			}
			//			logger.info("Received workflow data - not processing");
			//FIXME not sure  - needs to be stored somehow
			return;
		}
		case INVOCATION_STARTED_EVENT_TYPE: {
			InvocationStartedProvenanceItem startedItem = (InvocationStartedProvenanceItem) provenanceItem;
			ProcessorEnactment processorEnactment = processorEnactmentMap
					.get(startedItem.getParentId());
			if (processorEnactment == null) {
				logger.error("Could not find ProcessorEnactment for invocation "
						+ startedItem);
				return;
			}
			getWfdp().invocationProcessToProcessEnactment.put(
					startedItem.getInvocationProcessId(), processorEnactment);
			return;
		}
		case ERROR_EVENT_TYPE:
			//TODO process the error
			return;
		default:
			// TODO broken, should we throw something here?
			return;
		}
	}