private String processDataflowStructure()

in taverna-provenanceconnector/src/main/java/org/apache/taverna/provenance/lineageservice/EventProcessor.java [272:488]


	private String processDataflowStructure(Dataflow df, String dataflowID, String externalName) {
		String localWorkflowRunID = getWorkflowRunId();

		//dataflowDepth++;

		try {
			// check whether we already have this WF in the DB
			boolean alreadyInDb;
			try {
				List<String> workflowIds = pq.getAllworkflowIds();
				alreadyInDb = workflowIds != null && workflowIds.contains(dataflowID);
			} catch (SQLException e) {
				logger.warn("Problem processing dataflow structure for " + dataflowID, e);
				alreadyInDb = false;
			}

			// add workflow ID -- this is NOT THE SAME AS the workflowRunId

			/*
			 * this could be a nested workflow -- in this case, override its
			 * workflowRunId with that of its parent
			 */
			if (!alreadyInDb) {
				String parentDataflow = wfNestingMap.get(dataflowID);
				Blob blob = serialize(df);
				if (parentDataflow == null) {
					// this is a top level dataflow description
					pw.addWFId(dataflowID, null, externalName, blob); // set its dataflowID with no parent

				} else {
					// we are processing a nested workflow structure
					logger.debug("dataflow "+dataflowID+" with external name "+externalName+" is nested within "+parentDataflow);

					pw.addWFId(dataflowID, parentDataflow, externalName, blob); // set its dataflowID along with its parent

					// override workflowRunId to point to top level -- UNCOMMENTED PM 9/09  CHECK
					localWorkflowRunID = pq.getRuns(parentDataflow, null).get(0).getWorkflowRunId();
				}
			}
			// Log the run itself
			pw.addWorkflowRun(dataflowID, localWorkflowRunID);

			// add processors along with their variables
			List<Port> vars = new ArrayList<Port>();
			for (Processor p : df.getProcessors()) {
				String pName = p.getLocalName();

				//CHECK get type of first activity and set this as the type of the processor itself
				List<? extends Activity<?>> activities = p.getActivityList();

				if (! alreadyInDb) {
					ProvenanceProcessor provProc;
					String pType = null;
					if (activities != null && !activities.isEmpty())
						pType = activities.get(0).getClass().getCanonicalName();
					provProc = new ProvenanceProcessor();
					provProc.setIdentifier(UUID.randomUUID().toString());
					provProc.setProcessorName(pName);
					provProc.setFirstActivityClassName(pType);
					provProc.setWorkflowId(dataflowID);
					provProc.setTopLevelProcessor(false);

					pw.addProcessor(provProc);

					//pw.addProcessor(pName, pType, dataflowID, false);  // false: not a top level processor

					/*
					 * add all input ports for this processor as input variables
					 */
					for (ProcessorInputPort ip : p.getInputPorts()) {
						Port inputVar = new Port();
						inputVar.setIdentifier(UUID.randomUUID().toString());
						inputVar.setProcessorId(provProc.getIdentifier());
						inputVar.setProcessorName(pName);
						inputVar.setWorkflowId(dataflowID);
						inputVar.setPortName(ip.getName());
						inputVar.setDepth(ip.getDepth());
						inputVar.setInputPort(true);
					 	vars.add(inputVar);
					}

					/*
					 * add all output ports for this processor as output
					 * variables
					 */
					for (ProcessorOutputPort op : p.getOutputPorts()) {
						Port outputVar = new Port();
						outputVar.setIdentifier(UUID.randomUUID().toString());
						outputVar.setProcessorName(pName);
						outputVar.setProcessorId(provProc.getIdentifier());
						outputVar.setWorkflowId(dataflowID);
						outputVar.setPortName(op.getName());
						outputVar.setDepth(op.getDepth());
						outputVar.setInputPort(false);

						vars.add(outputVar);
					}
				}

				/*
				 * check for nested structures: if the activity is
				 * DataflowActivity then this processor is a nested workflow;
				 * make an entry into wfNesting map with its ID and recurse on
				 * the nested workflow
				 */

				if (activities != null)
					for (Activity<?> a : activities) {
						if (!(a instanceof NestedDataflow))
							continue;

						Dataflow nested = ((NestedDataflow) a)
								.getNestedDataflow();
						wfNestingMap.put(nested.getIdentifier(), dataflowID); // child -> parent

						// RECURSIVE CALL
						processDataflowStructure(nested,
								nested.getIdentifier(), p.getLocalName());
					}
			} // end for each processor

			// add inputs to entire dataflow
			String pName = INPUT_CONTAINER_PROCESSOR;  // overridden -- see below

			/*
			 * check whether we are processing a nested workflow. in this case
			 * the input vars are not assigned to the INPUT processor but to the
			 * containing dataflow
			 */
			if (! alreadyInDb) {
				if (externalName != null) // override the default if we are nested or someone external name is provided
					pName = externalName;

				for (DataflowInputPort ip : df.getInputPorts()) {
					Port inputVar = new Port();
					inputVar.setIdentifier(UUID.randomUUID().toString());
					inputVar.setProcessorId(null); // meaning workflow port
					inputVar.setProcessorName(pName);
					inputVar.setWorkflowId(dataflowID);
					inputVar.setPortName(ip.getName());
					inputVar.setDepth(ip.getDepth());
					inputVar.setInputPort(true);  // CHECK PM modified 11/08 -- input vars are actually outputs of input processors...

					vars.add(inputVar);
				}

				// add outputs of entire dataflow
				pName = OUTPUT_CONTAINER_PROCESSOR;  // overridden -- see below

				/*
				 * check whether we are processing a nested workflow. in this
				 * case the output vars are not assigned to the OUTPUT processor
				 * but to the containing dataflow
				 */
				if (externalName != null) // we are nested
					pName = externalName;

				for (DataflowOutputPort op : df.getOutputPorts()) {
					Port outputVar = new Port();
					outputVar.setIdentifier(UUID.randomUUID().toString());
					outputVar.setProcessorId(null); // meaning workflow port
					outputVar.setProcessorName(pName);
					outputVar.setWorkflowId(dataflowID);
					outputVar.setPortName(op.getName());
					outputVar.setDepth(op.getDepth());
					outputVar.setInputPort(false);  // CHECK PM modified 11/08 -- output vars are actually outputs of output processors...
					vars.add(outputVar);
				}

				pw.addPorts(vars, dataflowID);
				makePortMapping(vars);

				/*
				 * add datalink records using the dataflow links retrieving the
				 * processor names requires navigating from links to source/sink
				 * and from there to the processors
				 */
				for (Datalink l : df.getLinks()) {
					// TODO cover the case of datalinks from an input and to an output to the entire dataflow

					Port sourcePort = null;
					Port destinationPort = null;

					OutputPort source = l.getSource();
					if (source instanceof ProcessorOutputPort) {
						String sourcePname = ((ProcessorOutputPort) source)
								.getProcessor().getLocalName();
						sourcePort = lookupPort(sourcePname, source.getName(), false);
					} else if (source instanceof MergeOutputPort) {
						// TODO: Handle merge output ports
					} else
						// Assume it is internal port from DataflowInputPort
						sourcePort = lookupPort(externalName, source.getName(), true);

					InputPort sink = l.getSink();
					if (sink instanceof ProcessorInputPort) {
						String sinkPname = ((ProcessorInputPort) sink)
								.getProcessor().getLocalName();
						destinationPort = lookupPort(sinkPname, sink.getName(), true);
					} else if (sink instanceof MergeInputPort) {
						// TODO: Handle merge input ports
					} else
						// Assume it is internal port from DataflowOutputPort
						destinationPort = lookupPort(externalName, sink.getName(), false);

					if (sourcePort != null && destinationPort != null)
						pw.addDataLink(sourcePort, destinationPort, dataflowID);
					else
						logger.info("Can't record datalink " + l);
				}
			}
		} catch (Exception e) {
			logger.error("Problem processing provenance for dataflow", e);
		}

		return dataflowID;
	}