app/models/provenance.rb (434 lines of code) (raw):

require 'sparql' # query the graph require 'uri' # used to decode urls class Provenance # TODO: try to read the prefixes from the file @@prefixes = "PREFIX dc: <http://purl.org/dc/elements/1.1/> PREFIX prov: <http://www.w3.org/ns/prov#> PREFIX cnt: <http://www.w3.org/2011/content#> PREFIX foaf: <http://xmlns.com/foaf/0.1/> PREFIX dcmitype: <http://purl.org/dc/dcmitype/> PREFIX wfprov: <http://purl.org/wf4ever/wfprov#> PREFIX dcam: <http://purl.org/dc/dcam/> PREFIX xml: <http://www.w3.org/XML/1998/namespace> PREFIX vs: <http://www.w3.org/2003/06/sw-vocab-status/ns#> PREFIX dcterms: <http://purl.org/dc/terms/> PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#> PREFIX wot: <http://xmlns.com/wot/0.1/> PREFIX wfdesc: <http://purl.org/wf4ever/wfdesc#> PREFIX dct: <http://purl.org/dc/terms/> PREFIX tavernaprov: <http://ns.taverna.org.uk/2012/tavernaprov/> PREFIX owl: <http://www.w3.org/2002/07/owl#> PREFIX xsd: <http://www.w3.org/2001/XMLSchema#> PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> PREFIX skos: <http://www.w3.org/2004/02/skos/core#> PREFIX scufl2: <http://ns.taverna.org.uk/2010/scufl2#> " cattr_reader :prefixes attr_reader :graph @file = '' #constructor def initialize(filepath) @file = filepath @graph = RDF::Graph.new RDF::Reader.open("#{@file}") do |reader| reader.each_statement do |statement| @graph.insert(statement) end end end #Extract all the workflows and their parent workflow def getAllWorkflowRuns # create the query sparql_query = SPARQL.parse("#{Provenance.prefixes} SELECT * WHERE { ?workflowRun rdf:type wfprov:WorkflowRun ; rdfs:label ?workflowRunLabel . OPTIONAL { ?workflowRun wfprov:wasPartOfWorkflowRun ?wasPartOfWorkflowRun . ?wasPartOfWorkflowRun rdfs:label ?wasPartOfWorkflowRunLabel . FILTER NOT EXISTS { ?something foaf:primaryTopic ?wasPartOfWorkflowRun } } OPTIONAL { { ?workflowRun wfprov:usedInput ?usedDictionaryInput . ?usedDictionaryInput rdf:type prov:Dictionary } UNION { ?workflowRun wfprov:usedInput ?usedArtifactInput FILTER NOT EXISTS { ?usedArtifactInput rdf:type prov:Dictionary } } } FILTER NOT EXISTS { ?something foaf:primaryTopic ?workflowRun } }") #return the result of the performing the query sparql_query.execute(graph) end # Get all the ProcessRuns and their outlinks def getAllProcessRuns sparql_query = SPARQL.parse("#{Provenance.prefixes} SELECT * WHERE { ?processURI rdf:type wfprov:ProcessRun ; prov:startedAtTime ?startedAtTime ; prov:endedAtTime ?endedAtTime ; wfprov:wasEnactedBy ?engineUsed ; rdfs:label ?processLabel OPTIONAL { ?processURI wfprov:wasPartOfWorkflowRun ?wasPartOfWorkflow . ?wasPartOfWorkflow rdfs:label ?wasPartOfWorkflowLabel . FILTER NOT EXISTS { ?something foaf:primaryTopic ?wasPartOfWorkflow } } OPTIONAL { { ?processURI wfprov:usedInput ?usedDictionaryInput . ?usedDictionaryInput rdf:type prov:Dictionary } UNION { ?processURI wfprov:usedInput ?usedArtifactInput FILTER NOT EXISTS { ?usedArtifactInput rdf:type prov:Dictionary } } } }") # return the processes that were used sparql_query.execute(graph) end #Extract all the workflows and their parent workflow def getAllArtifacts # create the query sparql_query = SPARQL.parse("#{Provenance.prefixes} SELECT * WHERE { { ?artifactURI rdf:type wfprov:Artifact ; wfprov:describedByParameter ?describedByParameter . ?describedByParameter rdfs:comment ?comment OPTIONAL { ?artifactURI tavernaprov:content ?filepath } OPTIONAL { ?artifactURI wfprov:wasOutputFrom ?outputFromWorkflowRun . ?outputFromWorkflowRun rdf:type wfprov:WorkflowRun ; rdfs:label ?outputFromWorkflowRunLabel . FILTER NOT EXISTS { ?something foaf:primaryTopic ?outputFromWorkflowRun } } OPTIONAL { ?artifactURI wfprov:wasOutputFrom ?outputFromProcessRun . ?outputFromProcessRun rdf:type wfprov:ProcessRun ; prov:startedAtTime ?startedAtTime ; prov:endedAtTime ?endedAtTime ; rdfs:label ?outputFromProcessRunLabel } FILTER NOT EXISTS { ?artifactURI rdf:type prov:Dictionary } } UNION { ?dictionary rdf:type prov:Dictionary OPTIONAL { ?dictionary tavernaprov:content ?filepath } OPTIONAL { { ?dictionary prov:hadMember ?hadMemberDictionary . ?hadMemberDictionary rdf:type prov:Dictionary . } UNION { ?dictionary prov:hadMember ?hadMemberArtifact . ?hadMemberArtifact wfprov:describedByParameter ?describedByParameter . ?describedByParameter rdfs:comment ?comment . FILTER NOT EXISTS { ?hadMemberArtifact rdf:type prov:Dictionary } } } OPTIONAL { ?dictionary wfprov:wasOutputFrom ?outputFromWorkflowRun . ?outputFromWorkflowRun rdf:type wfprov:WorkflowRun ; rdfs:label ?outputFromWorkflowRunLabel . FILTER NOT EXISTS { ?something foaf:primaryTopic ?outputFromWorkflowRun } } OPTIONAL { ?dictionary wfprov:wasOutputFrom ?outputFromProcessRun . ?outputFromProcessRun rdf:type wfprov:ProcessRun ; prov:startedAtTime ?startedAtTime ; prov:endedAtTime ?endedAtTime ; rdfs:label ?outputFromProcessRunLabel } } }") # return the result of the performing the query sparql_query.execute(graph) end def getContentOf(extractedFilepath) content = "" if File.directory?(extractedFilepath) content = "[" # ffs = Files or Folders #for each folder/file inside this folder do ffs = Dir.glob(extractedFilepath + "/*") for file in ffs content = content + getContentOf("#{file}") + ", " end content = content[0...-2] + "]" elsif File.file?(extractedFilepath) content = File.read(extractedFilepath) end content end def to_dataHashObject(bundle_filepath) nodes = [] links = [] linkValue = 50 processorTrimCount = "Processor execution ".length workflowRunTrimCount = "Workflow run of ".length # get all the workflows getAllWorkflowRuns.each do |result| # get the name workflowRunURI = result["workflowRun"].to_s workflowRunLabel = result["workflowRunLabel"].to_s if workflowRunLabel[0] == "W" workflowRunLabel = workflowRunLabel[workflowRunTrimCount, workflowRunLabel.length] elsif workflowRunLabel[0] == "P" workflowRunLabel = workflowRunLabel[processorTrimCount, workflowRunLabel.length] end # a temp node for current (Decide whether to be added or not) workflowRun = {:name => workflowRunURI, :type => "Workflow Run", :label => workflowRunLabel} # see if exists indexSource = nodes.find_index(workflowRun) # check if indexSource.blank? indexSource = nodes.count nodes << workflowRun end # check if has property wasPartOfWorkflowRun if result["wasPartOfWorkflowRun"].present? secondWorkflowRunLabel = result["wasPartOfWorkflowRunLabel"].to_s if secondWorkflowRunLabel[0] == "W" secondWorkflowRunLabel = secondWorkflowRunLabel[workflowRunTrimCount, secondWorkflowRunLabel.length] elsif secondWorkflowRunLabel[0] == "P" secondWorkflowRunLabel = secondWorkflowRunLabel[processorTrimCount, secondWorkflowRunLabel.length] end secondWorkflowRun = {:name => result["wasPartOfWorkflowRun"].to_s, :type => "Workflow Run", :label => secondWorkflowRunLabel} indexTarget = nodes.find_index(secondWorkflowRun) if indexTarget.blank? indexTarget = nodes.count nodes << secondWorkflowRun end # add the link linkWfToWf = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkWfToWf).blank? links << linkWfToWf end end # check if has property usedInput if result["usedArtifactInput"].present? artifact = {:name => result["usedArtifactInput"].to_s, :type => "Artifact" } indexTarget = nodes.find_index(artifact) if indexTarget.blank? indexTarget = nodes.count nodes << artifact end # add the link linkProcessToArtifact = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkProcessToArtifact).blank? links << linkProcessToArtifact end end # check if has property usedInput if result["usedDictionaryInput"].present? dictionary = {:name => result["usedDictionaryInput"].to_s, :type => "Dictionary" } indexTarget = nodes.find_index(artifact) if indexTarget.blank? indexTarget = nodes.count nodes << dictionary end # add the link linkProcessToArtifact = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkProcessToArtifact).blank? links << linkProcessToArtifact end end end # get all the processes # get all the workflows getAllProcessRuns.each do |result| # get the name processRunURI = result["processURI"].to_s processRunLabel = result["processLabel"].to_s # a temp node for current (Decide whether to be added or not) processRun = {:name => processRunURI, :type => "Process Run", :startedAtTime => result["startedAtTime"].to_s, :endedAtTime =>result["endedAtTime"].to_s, :label => processRunLabel[processorTrimCount, processRunLabel.length]} # see if exists indexSource = nodes.find_index(processRun) # check if indexSource.blank? indexSource = nodes.count nodes << processRun end # check if has property wasPartOfWorkflow if result["wasPartOfWorkflow"].present? workflowRunLabel = result["wasPartOfWorkflowLabel"].to_s if workflowRunLabel[0] == "W" workflowRunLabel = workflowRunLabel[workflowRunTrimCount, workflowRunLabel.length] elsif workflowRunLabel[0] == "P" workflowRunLabel = workflowRunLabel[processorTrimCount, workflowRunLabel.length] end workflowRun = {:name => result["wasPartOfWorkflow"].to_s, :type => "Workflow Run", :label => workflowRunLabel} indexTarget = nodes.find_index(workflowRun) if indexTarget.blank? indexTarget = nodes.count nodes << workflowRun end # add the link linkProcessToWf = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkProcessToWf).blank? links << linkProcessToWf end end # check if has property usedInput if result["usedArtifactInput"].present? artifact = {:name => result["usedArtifactInput"].to_s, :type => "Artifact" } indexTarget = nodes.find_index(artifact) if indexTarget.blank? indexTarget = nodes.count nodes << artifact end # add the link linkProcessToArtifact = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkProcessToArtifact).blank? links << linkProcessToArtifact end end # check if has property usedInput if result["usedDictionaryInput"].present? dictionary = {:name => result["usedDictionaryInput"].to_s, :type => "Dictionary" } indexTarget = nodes.find_index(artifact) if indexTarget.blank? indexTarget = nodes.count nodes << dictionary end # add the link linkProcessToArtifact = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkProcessToArtifact).blank? links << linkProcessToArtifact end end # # check if has property engineUsed which represents the wfprov:wasEnactedBy # if result["engineUsed"].present? # engine = {:name => result["engineUsed"].to_s, :type => "Engine"} # indexTarget = nodes.find_index(engine) # if indexTarget.blank? # indexTarget = nodes.count # nodes << engine # end # # add the link # linkProcessToEngine = {:source => indexTarget, :target => indexSource, :value => linkValue} # if links.find_index(linkProcessToEngine).blank? # links << linkProcessToEngine # end # end end # get all the nodes and links related to the artifact getAllArtifacts.each do |result| if result["artifactURI"].present? # get the name artifactURI = result["artifactURI"].to_s # the node that needs to be added to the nodes artifact = {:name => artifactURI, :type => "Artifact"} else # get the name artifactURI = result["dictionary"].to_s # the node that needs to be added to the nodes artifact = {:name => artifactURI, :type => "Dictionary"} end # get the index of the artifact if present otherwise nil indexSource = -1 nodes.each_with_index do |node, index| if node[:type].to_s == artifact[:type].to_s if node[:name].to_s == artifact[:name].to_s indexSource = index artifactLabel = "List" if result["comment"].present? artifactLabel = result["comment"].to_s end if node[:label].present? and node[:label] != "List" node[:label] = node[:label] + "\\n" + artifactLabel else node.merge!(:label => artifactLabel) end if !(node[:content].present?) and result["filepath"].present? artifactContent = getContentOf("#{bundle_filepath}#{result["filepath"].to_s}") node[:content] = artifactContent end end end end # check if is already in the list if not add to nodes if indexSource == -1 indexSource = nodes.count artifactLabel = "List" if result["comment"].present? artifactLabel = result["comment"].to_s end artifact[:label] = artifactLabel artifactContent = "" if result["filepath"].present? artifactContent = getContentOf("#{bundle_filepath}#{result["filepath"].to_s}") artifact[:content] = artifactContent end nodes << artifact end # check if it has the property wasOutputFrom a process Run and add a link entity-process if result["outputFromProcessRun"].present? processRunLabel = result["outputFromProcessRunLabel"].to_s processRun = {:name => result["outputFromProcessRun"].to_s, :type => "Process Run", :startedAtTime => result["startedAtTime"].to_s, :endedAtTime =>result["endedAtTime"].to_s, :label => processRunLabel[processorTrimCount, processRunLabel.length]} indexTarget = nodes.find_index(processRun) if indexTarget.blank? indexTarget = nodes.count nodes << processRun end # add the link linkArtifactToProcess = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkArtifactToProcess).blank? links << linkArtifactToProcess end end if result["outputFromWorkflowRun"].present? workflowRunLabel = result["outputFromWorkflowRunLabel"].to_s if workflowRunLabel[0] == "W" workflowRunLabel = workflowRunLabel[workflowRunTrimCount, workflowRunLabel.length] elsif workflowRunLabel[0] == "P" workflowRunLabel = workflowRunLabel[processorTrimCount, workflowRunLabel.length] end workflowRun = {:name => result["outputFromWorkflowRun"].to_s, :type => "Workflow Run", :label => workflowRunLabel} indexTarget = nodes.find_index(workflowRun) if indexTarget.blank? indexTarget = nodes.count nodes << workflowRun end # add the link linkArtifactToWorkflow = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkArtifactToWorkflow).blank? links << linkArtifactToWorkflow end end if result["hadMemberArtifact"].present? memberArtifact = {:name => result["hadMemberArtifact"].to_s, :type => "Artifact"} indexTarget = -1 nodes.each_with_index do |node, index| if node[:type].to_s == memberArtifact[:type].to_s if node[:name].to_s == memberArtifact[:name].to_s indexTarget = index end end end if indexTarget == -1 if result["comment"].present? artifactLabel = result["comment"].to_s memberArtifact.merge!(:label => artifactLabel) end indexTarget = nodes.count nodes << memberArtifact end # add the link if result["outputFromProcessRun"].present? linkDictToArtifact = {:source => indexSource, :target => indexTarget, :value => linkValue} if links.find_index(linkDictToArtifact).blank? links << linkDictToArtifact end else linkDictToArtifact = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkDictToArtifact).blank? links << linkDictToArtifact end end end if result["hadMemberDictionary"].present? dictionary = {:name => result["hadMemberDictionary"].to_s, :type => "Dictionary"} indexTarget = -1 nodes.each_with_index do |node, index| if node[:type].to_s == dictionary[:type].to_s if node[:name].to_s == dictionary[:name].to_s indexTarget = index end end end if indexTarget == -1 # if result["comment"].present? # artifactLabel = result["comment"].to_s # memberArtifact.merge!(:label => artifactLabel) # end indexTarget = nodes.count nodes << dictionary end # add the link linkDictToDict = {:source => indexTarget, :target => indexSource, :value => linkValue} if links.find_index(linkDictToDict).blank? links << linkDictToDict end end end # make a hash to return stream = {:nodes => nodes, :links => links } # return stream stream end # persisted is important not to get "undefined method `to_key' for" error def persisted? false end end