public void reconcileLocalOutputs()

in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java [835:956]


	public void reconcileLocalOutputs(String dataflowID) {
		/*
	for each output O

	for each variable V in predecessors(O)

	fetch all VB records for O into list OValues
	fetch all VB records for V  into list Yalues

	compare OValues and VValues:
	it SHOULD be the case that OValues is a subset of YValues. Under this assumption:

	for each vb in YValues:
	- if there is a matching o in OValues then (vb may be missing collection information)
	    copy o to vb
	  else
	    if vb has no collection info && there is a matching tree node tn  in OTree (use iteration index for the match) then
	       set vb to be in collection tb
	       copy vb to o

     finally copy all Collection records for O in OTree -- catch duplicate errors
		 */

		Map<String, String> queryConstraints = new HashMap<>();

		try {
			// for each output O
			for (Port output:pq.getOutputPorts(topLevelDataflowName, topLevelDataflowID))  {
				// collect all VBs for O
//				String oPName = output.getPName();
//				String oVName = output.getVName();
//				queryConstraints.put("V.portName", oVName);
//				queryConstraints.put("V.processorName", oPName);
//				queryConstraints.put("VB.workflowRunId", workflowRunId);
//				queryConstraints.put("V.workflowId", topLevelDataflowID);

//				List<PortBinding> OValues = pq.getPortBindings(queryConstraints);

				// find all records for the immediate precedessor Y of O
				queryConstraints.clear();
//				queryConstraints.put("destinationPortName", output.getVName());
//				queryConstraints.put("destinationProcessorName", output.getPName());
				queryConstraints.put("destinationPortId", output.getIdentifier());
				queryConstraints.put("workflowId", output.getWorkflowId());
				List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints);

				// there can be only one -- but check that there is one!
				if (incomingDataLinks.isEmpty())
					continue;

				String sourcePname = incomingDataLinks.get(0).getSourceProcessorName();
				String sourceVname = incomingDataLinks.get(0).getSourcePortName();

				queryConstraints.clear();
				queryConstraints.put("V.portName", sourceVname);
				queryConstraints.put("V.processorName", sourcePname);
				queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
				queryConstraints.put("V.workflowId", topLevelDataflowID);

				List<PortBinding> YValues = pq.getPortBindings(queryConstraints);

				// for each YValue look for a match in OValues
				// (assume the YValues values are a superset of OValues)!)

				for (PortBinding yValue:YValues) {
					// look for a matching record in PortBinding for output O
					queryConstraints.clear();
					queryConstraints.put("V.portName", output.getPortName());
					queryConstraints.put("V.processorName", output.getProcessorName());
					queryConstraints.put("VB.workflowRunId", getWorkflowRunId());
					queryConstraints.put("V.workflowid", topLevelDataflowID);
					queryConstraints.put("VB.iteration", yValue.getIteration());
					if (yValue.getCollIDRef()!= null) {
						queryConstraints.put("VB.collIDRef", yValue.getCollIDRef());
						queryConstraints.put("VB.positionInColl", Integer.toString(yValue.getPositionInColl()));
					}
					List<PortBinding> matchingOValues = pq.getPortBindings(queryConstraints);

					// result at most size 1
					if (!matchingOValues.isEmpty()) {
						PortBinding oValue = matchingOValues.get(0);

						// copy collection info from oValue to yValue
						yValue.setCollIDRef(oValue.getCollIDRef());
						yValue.setPositionInColl(oValue.getPositionInColl());

						pw.updatePortBinding(yValue);
					} else {
						// copy the yValue to O
						// insert PortBinding back into VB with the global output portName
						yValue.setProcessorName(output.getProcessorName());
						yValue.setPortName(output.getPortName());
						pw.addPortBinding(yValue);
					}

				} // for each yValue in YValues

				// copy all Collection records for O to Y

				// get all collections refs for O
				queryConstraints.clear();
				queryConstraints.put("workflowRunId", getWorkflowRunId());
				queryConstraints.put("processorNameRef", output.getProcessorName());
				queryConstraints.put("portName", output.getPortName());

				List<NestedListNode> oCollections = pq.getNestedListNodes(queryConstraints);

				// insert back as collection refs for Y -- catch duplicates
				for (NestedListNode nln:oCollections) {
					nln.setProcessorName(sourcePname);
					nln.setProcessorName(sourceVname);

					getPw().replaceCollectionRecord(nln, sourcePname, sourceVname);
				}

			} // for each output var

		} catch (SQLException e) {
			logger.warn("Problem reconciling top level outputs", e);
		}

	}