in tez-api/src/main/java/org/apache/tez/dag/api/DAG.java [331:450]
public void verify(boolean restricted) throws IllegalStateException {
if (vertices.isEmpty()) {
throw new IllegalStateException("Invalid dag containing 0 vertices");
}
processEdgesAndGroups();
// check for valid vertices, duplicate vertex names,
// and prepare for cycle detection
Map<String, AnnotatedVertex> vertexMap = new HashMap<String, AnnotatedVertex>();
Map<Vertex, Set<String>> inboundVertexMap = new HashMap<Vertex, Set<String>>();
Map<Vertex, Set<String>> outboundVertexMap = new HashMap<Vertex, Set<String>>();
for (Vertex v : vertices.values()) {
if (vertexMap.containsKey(v.getName())) {
throw new IllegalStateException("DAG contains multiple vertices"
+ " with name: " + v.getName());
}
vertexMap.put(v.getName(), new AnnotatedVertex(v));
}
Map<Vertex, List<Edge>> edgeMap = new HashMap<Vertex, List<Edge>>();
for (Edge e : edges) {
// Construct structure for cycle detection
Vertex inputVertex = e.getInputVertex();
Vertex outputVertex = e.getOutputVertex();
List<Edge> edgeList = edgeMap.get(inputVertex);
if (edgeList == null) {
edgeList = new ArrayList<Edge>();
edgeMap.put(inputVertex, edgeList);
}
edgeList.add(e);
// Construct map for Input name verification
Set<String> inboundSet = inboundVertexMap.get(outputVertex);
if (inboundSet == null) {
inboundSet = new HashSet<String>();
inboundVertexMap.put(outputVertex, inboundSet);
}
inboundSet.add(inputVertex.getName());
// Construct map for Output name verification
Set<String> outboundSet = outboundVertexMap.get(inputVertex);
if (outboundSet == null) {
outboundSet = new HashSet<String>();
outboundVertexMap.put(inputVertex, outboundSet);
}
outboundSet.add(outputVertex.getName());
}
// check input and output names don't collide with vertex names
for (Vertex vertex : vertices.values()) {
for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
if (vertexMap.containsKey(input.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
+ " contains an Input with the same name as vertex: "
+ input.getName());
}
}
for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
if (vertexMap.containsKey(output.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
+ " contains an Output with the same name as vertex: "
+ output.getName());
}
}
}
// Check for valid InputNames
for (Entry<Vertex, Set<String>> entry : inboundVertexMap.entrySet()) {
Vertex vertex = entry.getKey();
for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
if (entry.getValue().contains(input.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
+ " contains an incoming vertex and Input with the same name: "
+ input.getName());
}
}
}
// Check for valid OutputNames
for (Entry<Vertex, Set<String>> entry : outboundVertexMap.entrySet()) {
Vertex vertex = entry.getKey();
for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
if (entry.getValue().contains(output.getName())) {
throw new IllegalStateException("Vertex: "
+ vertex.getName()
+ " contains an outgoing vertex and Output with the same name: "
+ output.getName());
}
}
}
// Not checking for repeated input names / output names vertex names on the same vertex,
// since we only allow 1 at the moment.
// When additional inputs are supported, this can be chceked easily (and early)
// within the addInput / addOutput call itself.
detectCycles(edgeMap, vertexMap);
checkAndInferOneToOneParallelism();
if (restricted) {
for (Edge e : edges) {
if (e.getEdgeProperty().getDataSourceType() !=
DataSourceType.PERSISTED) {
throw new IllegalStateException(
"Unsupported source type on edge. " + e);
}
if (e.getEdgeProperty().getSchedulingType() !=
SchedulingType.SEQUENTIAL) {
throw new IllegalStateException(
"Unsupported scheduling type on edge. " + e);
}
}
}
}