private void xformStep()

in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/ProvenanceAnalysis.java [474:802]


	private void xformStep(
			String workflowRunId,
			String workflowId,
			Port outputVar, // we need the dnl from this output var
			String proc, String path,
			List<ProvenanceProcessor> selectedProcessors,
			List<LineageSQLQuery> lqList) throws SQLException {
		// retrieve input vars for current processor
		Map<String, String> varsQueryConstraints = new HashMap<>();

		List<Port> inputVars = null;

		/*
		 * here we fetch the input vars for the current proc. however, it may be
		 * the case that we are looking at a dataflow port (for the entire
		 * dataflow or for a subdataflow) rather than a real processor. in this
		 * case we treat this as a special processor that does nothing -- so we
		 * "input var" in this case is a copy of the port, and we are ready to
		 * go for the next xfer step. in this way we can seamlessly traverse the
		 * graph over intermediate I/O that are part of nested dataflows
		 */

		if (getPq().isDataflow(proc)) { // if we are looking at the output of an entire dataflow
			// force the "input vars" for this step to be the output var itself
			// this causes the following xfer step to trace back to the next processor _within_ proc
			inputVars = new ArrayList<>();
			inputVars.add(outputVar);
		} else if (proc.equals(OUTPUT_CONTAINER_PROCESSOR)) {  // same action as prev case, but may change in the future
			inputVars = new ArrayList<>();
			inputVars.add(outputVar);
		} else {
			varsQueryConstraints.put("W.workflowId", workflowId);
			varsQueryConstraints.put("processorName", proc);
			varsQueryConstraints.put("isInputPort", "1");

			inputVars = getPq().getPorts(varsQueryConstraints);
		}

		///////////
		/// path projections
		///////////
		// maps each var to its projected path
		Map<Port,String> var2Path = new HashMap<>();
		Map<Port,Integer> var2delta = new HashMap<>();

		if (path == null) {  // nothing to split
			for (Port inputVar : inputVars)
				var2Path.put(inputVar, null);
		} else {
			int minPathLength = 0;  // if input path is shorter than this we give up granularity altogether
			for (Port inputVar : inputVars) {
				int resolvedDepth = 0;
				if (inputVar.getResolvedDepth() != null)
					resolvedDepth = inputVar.getResolvedDepth();
				int delta = resolvedDepth - inputVar.getDepth();
				var2delta.put(inputVar, delta);
				minPathLength += delta;
			}

			String iterationVector[] = path.split(",");

			if (iterationVector.length < minPathLength) {  // no path is propagated
				for (Port inputVar: inputVars)
					var2Path.put(inputVar, null);
			} else { // compute projected paths
				String[] projectedPath;

				int start = 0;
				for (Port inputVar: inputVars) {
					// 24/7/08 get DNL (declared nesting level) and ANL (actual nesting level) from VAR
					// TODO account for empty paths
					int projectedPathLength = var2delta.get(inputVar);  // this is delta

					if (projectedPathLength == 0) {
						// associate empty path to this var
						var2Path.put(inputVar, null);
						continue;
					}

					// this var is involved in iteration
					projectedPath = new String[projectedPathLength];
					for (int i = 0; i < projectedPathLength; i++)
						projectedPath[i] = iterationVector[start + i];
					start += projectedPathLength;

					StringBuilder iterationFragment = new StringBuilder();
					for (String s : projectedPath)
						iterationFragment.append(s + ",");
					iterationFragment
							.deleteCharAt(iterationFragment.length() - 1);

					var2Path.put(inputVar, iterationFragment.toString());
				}
			}
		}

		// accumulate this proc to current path
		currentPath.add(proc);

		/*
		 * if this is a selected processor, add a copy of the current path to
		 * the list of paths for the processor
		 */

		// is <workflowId, proc>  in selectedProcessors?
		boolean isSelected = false;
		for (ProvenanceProcessor pp : selectedProcessors)
			if (pp.getWorkflowId().equals(workflowId)
					&& pp.getProcessorName().equals(proc)) {
				List<List<String>> paths = validPaths.get(pp);

				// copy the path since the original will change
				// also remove spurious dataflow processors at this point
				List<String> pathCopy = new ArrayList<>();
				for (String s : currentPath)
					if (!getPq().isDataflow(s))
						pathCopy.add(s);
				paths.add(pathCopy);
				isSelected = true;
				break;
			}

		///////////
		/// generate SQL if necessary -- for all input vars, based on the current path
		/// the projected paths are required to determine the level in the collection at which
		/// we look at the value assignment
		///////////

		Map<String, ProvenanceArtifact> var2Artifact = new HashMap<>();
		Map<String, ProvenanceRole> var2ArtifactRole = new HashMap<>();

		// if this transformation is important to the user, produce an output and also an OPM graph fragment
		if (selectedProcessors.isEmpty() || isSelected) {
			List<LineageSQLQuery> newLqList = getPq().lineageQueryGen(
					workflowRunId, proc, var2Path, outputVar, path,
					isReturnOutputs() || var2Path.isEmpty());
			lqList.addAll(newLqList);

			// BEGIN OPM update section
			//
			// create OPM artifact and role for the output var of this xform
			//
			boolean doOPM = (aOPMManager != null && aOPMManager.isActive());  // any problem below will set this to false

			if (doOPM) {
				// fetch value for this variable and assert it as an Artifact in the OPM graph
				Map<String, String> vbConstraints = new HashMap<>();
				vbConstraints.put("VB.processorNameRef",
						outputVar.getProcessorName());
				vbConstraints.put("VB.portName", outputVar.getPortName());
				vbConstraints.put("VB.workflowRunId", workflowRunId);

				if (path != null) {
					/*
					 * account for x,y,.. format as well as [x,y,...] depending
					 * on where the request is coming from
					 */
					// TODO this is just irritating must be removed
					if (path.startsWith("["))
						vbConstraints.put("VB.iteration", path);
					else
						vbConstraints.put("VB.iteration", "[" + path + "]");
				}

				List<PortBinding> vbList = getPq().getPortBindings(vbConstraints); // DB

				/*
				 * use only the first result (expect only one) -- in this method
				 * we assume path is not null
				 */

				// map the resulting varBinding to an Artifact
				if (vbList == null || vbList.size() == 0) {
					logger.debug("no entry corresponding to conditions: proc="
							+ outputVar.getProcessorName() + " var = "
							+ outputVar.getPortName() + " iteration = " + path);
					doOPM = false;
				} else {
					PortBinding vb = vbList.get(0);

					if (aOPMManager != null && !pq.isDataflow(proc)) {
						if (isRecordArtifactValues()) {
							T2Reference ref = getInvocationContext()
									.getReferenceService().referenceFromString(
											vb.getValue());

							Object data = ic.getReferenceService()
									.renderIdentifier(ref, Object.class, ic);

							// ReferenceSetImpl o = (ReferenceSetImpl) ic.getReferenceService().resolveIdentifier(ref, null, ic);
							logger.debug("deref value for ref: " + ref + " "
									+ data + " of class "
									+ data.getClass().getName());

							try {
								aOPMManager.addArtifact(vb.getValue(), data);
							} catch (ProvenanceException e) {
								logger.warn("Could not add artifact", e);
							}
						} else {
							try {
								aOPMManager.addArtifact(vb.getValue());
							} catch (ProvenanceException e) {
								logger.warn("Could not add artifact", e);
							}
						}
						aOPMManager.createRole(vb.getWorkflowRunId(),
								vb.getWorkflowId(), vb.getProcessorName(),
								vb.getIteration());
					}

					/*
					 * assert proc as Process -- include iteration vector to
					 * separate different activations of the same process
					 */
					try {
						aOPMManager.addProcess(proc, vb.getIteration(),
								workflowId, vb.getWorkflowRunId());
					} catch (ProvenanceException e) {
						logger.warn("Could not add process", e);
					}

					/*
					 * create OPM generatedBy property between output value and
					 * this process node avoid the pathological case where a
					 * dataflow generates its own inputs
					 */
					try {
						aOPMManager.assertGeneratedBy(
								aOPMManager.getCurrentArtifact(),
								aOPMManager.getCurrentProcess(),
								aOPMManager.getCurrentRole(),
								aOPMManager.getCurrentAccount(), true);
					} catch (ProvenanceException e) {
						logger.warn("Could not add assertion", e);
					}
				}
			}
			//
			// create OPM process for this xform
			//
			for (LineageSQLQuery lq : newLqList) {
				// if OPM is on, execute the query so we get the value we need for the Artifact node
				Dependencies inputs = getPq().runLineageQuery(lq,
						isIncludeDataValue());

				if (doOPM && inputs.getRecords().size() > 0) { // && !pq.isDataflow(proc)) {
					//	update OPM graph with inputs and used properties
					for (LineageQueryResultRecord resultRecord: inputs.getRecords()) {
						// process inputs only
						if (!resultRecord.isInputPort())
							continue;

						// map each input var in the resultRecord to an Artifact
						// create new Resource for the resultRecord
						//    use the value as URI for the Artifact, and resolvedValue as the actual value

						//
						// create OPM artifact and role for the input var obtained by path projection
						//
						if (resultRecord.isCollection()) {
							try {
								aOPMManager.addArtifact(resultRecord
										.getCollectionT2Reference());
							} catch (ProvenanceException e) {
								logger.warn("Could not add artifact", e);
							}
						} else if (isRecordArtifactValues()) {
							T2Reference ref = getInvocationContext()
									.getReferenceService().referenceFromString(
											resultRecord.getValue());
							Object data = ic.getReferenceService()
									.renderIdentifier(ref, Object.class, ic);
							logger.debug("deref value for ref: " + ref + " "
									+ data + " of class "
									+ data.getClass().getName());
							try {
								aOPMManager.addArtifact(
										resultRecord.getValue(), data);
							} catch (ProvenanceException e) {
								logger.warn("Could not add artifact", e);
							}
						} else {
							try {
								aOPMManager
										.addArtifact(resultRecord.getValue());
							} catch (ProvenanceException e) {
								logger.warn("Could not add artifact", e);
							}
							var2Artifact.put(resultRecord.getPortName(),
									aOPMManager.getCurrentArtifact());

							aOPMManager.createRole(
									resultRecord.getWorkflowRunId(),
									resultRecord.getworkflowId(),
									resultRecord.getProcessorName(),
									resultRecord.getIteration());
							var2ArtifactRole.put(resultRecord.getPortName(),
									aOPMManager.getCurrentRole());

							//
							// create OPM used property between process and the input var obtained by path projection
							//
							// avoid output variables, it would assert that P used one of its outputs!

							try {
								aOPMManager.assertUsed(
										aOPMManager.getCurrentArtifact(),
										aOPMManager.getCurrentProcess(),
										aOPMManager.getCurrentRole(),
										aOPMManager.getCurrentAccount(), true);
							} catch (ProvenanceException e) {
								logger.warn("Could not add artifact", e);
							}

							// true -> prevent duplicates CHECK
						}
					}
				}
				// END OPM update section
			}

			// recursion -- xfer path is next up
			for (Port inputVar : inputVars)
				xferStep(workflowRunId, workflowId, inputVar,
						var2Path.get(inputVar), selectedProcessors, lqList);
		}
		currentPath.remove(currentPath.size()-1);  // CHECK
	}  // end xformStep