public void processTrees()

in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/WorkflowDataProcessor.java [117:264]


	public void processTrees(DataflowRunComplete completeEvent,
			String workflowRunId) {
		String workflowId = completeEvent.getWorkflowId();
		logger.debug("processing output trees");

		// i:inputPortName -> t2Ref
		Map<String, String> workflowPortData = new HashMap<>();

		for (Map.Entry<String, List<WorkflowDataNode>> entry : workflowDataTrees
				.entrySet()) {
			String portName = entry.getKey();
			List<WorkflowDataNode> tree = entry.getValue();

			PortBinding vb = null;

			logger.debug("storing tree for var "+portName+" in workflow with ID "+workflowId+" and instance "+workflowRunId);
			for (WorkflowDataNode node:tree) {
				try {
					if (!node.getWorkflowID().equals(workflowId))
						continue;

					if (node.getIndex().equals("[]")) {
						// Store top-level workflow inputs/outputs
						if (! node.getProcessId().equals(completeEvent.getProcessId()))
							//logger.warn("Unexpected process ID " + node.getProcessId() + " expected " + completeEvent.getProcessId());
							continue;
						String portKey = (node.isInputPort() ? "/i:" : "/o:") + node.getPortName();
						workflowPortData.put(portKey, node.getValue());
					}

					if (node.isList) {
						logger.debug("creating collection entry for "
								+ node.value + " with index " + node.index);

						if (node.getParent() != null) {
							logger.debug(" and parent " + node.parent.index);
							// write a collection record to DB
							getPw().addCollection(workflowId, node.getValue(),
									node.getParent().getValue(),
									node.getIndex(), portName, workflowRunId);
						} else
							getPw().addCollection(workflowId, node.getValue(),
									null, node.getIndex(), portName,
									workflowRunId);
					} else {
						logger.debug("creating PortBinding for " + node.value
								+ " with index " + node.index);

						vb = new PortBinding();

						vb.setWorkflowId(workflowId);
						vb.setWorkflowRunId(workflowRunId);

						vb.setProcessorName(pq.getWorkflow(workflowId)
								.getExternalName());

						// vb.setValueType(); // TODO not sure what to set this to
						vb.setPortName(portName);
						vb.setIteration(node.getIndex());
						vb.setValue(node.getValue());

						if (node.getParent() != null) {
							logger.debug(" in collection "
									+ node.getParent().value + " with index "
									+ node.getParent().getIndex());

							vb.setCollIDRef(node.getParent().getValue());
							vb.setPositionInColl(node.getRelativePosition());
						} else
							vb.setPositionInColl(1); // default

						try {
							getPw().addPortBinding(vb);
						} catch (SQLException e) {
							logger.debug("Problem processing trees for workflow: "
									+ workflowId
									+ " instance: "
									+ workflowRunId
									+ " : updating instead of inserting");
							getPw().updatePortBinding(vb);
						}
					}
				} catch (SQLException e) {
					logger.debug(
							"Database problem processing trees for workflow: "
									+ workflowId + " instance: "
									+ workflowRunId + " : " + workflowId, e);
				}
			}
		}

		List<Port> ports = getPq().getPortsForDataflow(workflowId);
		String processId = completeEvent.getProcessId();

		DataflowInvocation invocation = new DataflowInvocation();
		invocation.setDataflowInvocationId(UUID.randomUUID().toString());
		invocation.setWorkflowId(workflowId);
		invocation.setWorkflowRunId(workflowRunId);

		String parentProcessId = parentProcess(processId, 1);
		if (parentProcessId != null) {
			ProcessorEnactment procAct = invocationProcessToProcessEnactment
					.get(parentProcessId);
			if (procAct != null)
				invocation.setParentProcessorEnactmentId(procAct
						.getProcessEnactmentId());
		}

		invocation.setInvocationStarted(workflowStarted.get(completeEvent
				.getParentId()));
		invocation.setInvocationEnded(completeEvent.getInvocationEnded());
		invocation.setCompleted(completeEvent.getState()
				.equals(State.completed));

		// Register data
		String inputsDataBindingId = UUID.randomUUID().toString();
		String outputsDataBindingId = UUID.randomUUID().toString();
		for (Port port : ports) {
			String portKey = (port.isInputPort() ? "/i:" : "/o:")
					+ port.getPortName();
			String t2Reference = workflowPortData.get(portKey);
			if (t2Reference == null) {
				logger.warn("No workflow port data for " + portKey);
				continue;
			}
			DataBinding dataBinding = new DataBinding();
			dataBinding
					.setDataBindingId(port.isInputPort() ? inputsDataBindingId
							: outputsDataBindingId);
			dataBinding.setPort(port);
			dataBinding.setT2Reference(t2Reference);
			dataBinding.setWorkflowRunId(workflowRunId);
			try {
				pw.addDataBinding(dataBinding);
			} catch (SQLException e) {
				logger.warn("Could not add databinding for " + portKey, e);
			}
		}

		invocation.setInputsDataBindingId(inputsDataBindingId);
		invocation.setOutputsDataBindingId(outputsDataBindingId);
		try {
			pw.addDataflowInvocation(invocation);
		} catch (SQLException e) {
			logger.warn("Could not store dataflow invocation for " + processId,
					e);
		}
	}