public void verify()

in tez-api/src/main/java/org/apache/tez/dag/api/DAG.java [331:450]


  public void verify(boolean restricted) throws IllegalStateException {
    if (vertices.isEmpty()) {
      throw new IllegalStateException("Invalid dag containing 0 vertices");
    }

    processEdgesAndGroups();
    
    // check for valid vertices, duplicate vertex names,
    // and prepare for cycle detection
    Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
    Map<Vertex, Set<String>> inboundVertexMap = new HashMap<Vertex, Set<String>>();
    Map<Vertex, Set<String>> outboundVertexMap = new HashMap<Vertex, Set<String>>();
    for (Vertex v : vertices.values()) {
      if (vertexMap.containsKey(v.getName())) {
        throw new IllegalStateException("DAG contains multiple vertices"
          + " with name: " + v.getName());
      }
      vertexMap.put(v.getName(), new AnnotatedVertex(v));
    }

    Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
    for (Edge e : edges) {
      // Construct structure for cycle detection
      Vertex inputVertex = e.getInputVertex();
      Vertex outputVertex = e.getOutputVertex();      
      List<Edge> edgeList = edgeMap.get(inputVertex);
      if (edgeList == null) {
        edgeList = new ArrayList<Edge>();
        edgeMap.put(inputVertex, edgeList);
      }
      edgeList.add(e);
      
      // Construct map for Input name verification
      Set<String> inboundSet = inboundVertexMap.get(outputVertex);
      if (inboundSet == null) {
        inboundSet = new HashSet<String>();
        inboundVertexMap.put(outputVertex, inboundSet);
      }
      inboundSet.add(inputVertex.getName());
      
      // Construct map for Output name verification
      Set<String> outboundSet = outboundVertexMap.get(inputVertex);
      if (outboundSet == null) {
        outboundSet = new HashSet<String>();
        outboundVertexMap.put(inputVertex, outboundSet);
      }
      outboundSet.add(outputVertex.getName());
    }

    // check input and output names don't collide with vertex names
    for (Vertex vertex : vertices.values()) {
      for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
        if (vertexMap.containsKey(input.getName())) {
          throw new IllegalStateException("Vertex: "
              + vertex.getName()
              + " contains an Input with the same name as vertex: "
              + input.getName());
        }
      }
      for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
        if (vertexMap.containsKey(output.getName())) {
          throw new IllegalStateException("Vertex: "
              + vertex.getName()
              + " contains an Output with the same name as vertex: "
              + output.getName());
        }
      }
    }

    // Check for valid InputNames
    for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) {
      Vertex vertex = entry.getKey();
      for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
        if (entry.getValue().contains(input.getName())) {
          throw new IllegalStateException("Vertex: "
              + vertex.getName()
              + " contains an incoming vertex and Input with the same name: "
              + input.getName());
        }
      }
    }

    // Check for valid OutputNames
    for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) {
      Vertex vertex = entry.getKey();
      for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
        if (entry.getValue().contains(output.getName())) {
          throw new IllegalStateException("Vertex: "
              + vertex.getName()
              + " contains an outgoing vertex and Output with the same name: "
              + output.getName());
        }
      }
    }
    
    
    // Not checking for repeated input names / output names vertex names on the same vertex,
    // since we only allow 1 at the moment.
    // When additional inputs are supported, this can be chceked easily (and early)
    // within the addInput / addOutput call itself.

    detectCycles(edgeMap, vertexMap);
    
    checkAndInferOneToOneParallelism();

    if (restricted) {
      for (Edge e : edges) {
        if (e.getEdgeProperty().getDataSourceType() !=
          DataSourceType.PERSISTED) {
          throw new IllegalStateException(
            "Unsupported source type on edge. " + e);
        }
        if (e.getEdgeProperty().getSchedulingType() !=
          SchedulingType.SEQUENTIAL) {
          throw new IllegalStateException(
            "Unsupported scheduling type on edge. " + e);
        }
      }
    }
  }