in tez-api/src/main/java/org/apache/tez/dag/api/DAG.java [515:673]
public DAGPlan createDag(Configuration dagConf) {
verify(true);
DAGPlan.Builder dagBuilder = DAGPlan.newBuilder();
dagBuilder.setName(this.name);
if (!vertexGroups.isEmpty()) {
for (VertexGroup av : vertexGroups) {
GroupInfo groupInfo = av.getGroupInfo();
PlanVertexGroupInfo.Builder groupBuilder = PlanVertexGroupInfo.newBuilder();
groupBuilder.setGroupName(groupInfo.getGroupName());
for (Vertex v : groupInfo.getMembers()) {
groupBuilder.addGroupMembers(v.getName());
}
groupBuilder.addAllOutputs(groupInfo.outputs);
for (Map.Entry<String, InputDescriptor> entry :
groupInfo.edgeMergedInputs.entrySet()) {
groupBuilder.addEdgeMergedInputs(
PlanGroupInputEdgeInfo.newBuilder().setDestVertexName(entry.getKey()).
setMergedInput(DagTypeConverters.convertToDAGPlan(entry.getValue())));
}
dagBuilder.addVertexGroups(groupBuilder);
}
}
for (Vertex vertex : vertices.values()) {
VertexPlan.Builder vertexBuilder = VertexPlan.newBuilder();
vertexBuilder.setName(vertex.getName());
vertexBuilder.setType(PlanVertexType.NORMAL); // vertex type is implicitly NORMAL until TEZ-46.
vertexBuilder.setProcessorDescriptor(DagTypeConverters
.convertToDAGPlan(vertex.getProcessorDescriptor()));
if (vertex.getInputs().size() > 0) {
for (RootInputLeafOutput<InputDescriptor> input : vertex.getInputs()) {
vertexBuilder.addInputs(DagTypeConverters.convertToDAGPlan(input));
}
}
if (vertex.getOutputs().size() > 0) {
for (RootInputLeafOutput<OutputDescriptor> output : vertex.getOutputs()) {
vertexBuilder.addOutputs(DagTypeConverters.convertToDAGPlan(output));
}
}
//task config
PlanTaskConfiguration.Builder taskConfigBuilder = PlanTaskConfiguration.newBuilder();
Resource resource = vertex.getTaskResource();
taskConfigBuilder.setNumTasks(vertex.getParallelism());
taskConfigBuilder.setMemoryMb(resource.getMemory());
taskConfigBuilder.setVirtualCores(resource.getVirtualCores());
taskConfigBuilder.setJavaOpts(vertex.getTaskLaunchCmdOpts());
taskConfigBuilder.setTaskModule(vertex.getName());
PlanLocalResource.Builder localResourcesBuilder = PlanLocalResource.newBuilder();
localResourcesBuilder.clear();
for (Entry<String, LocalResource> entry :
vertex.getTaskLocalFiles().entrySet()) {
String key = entry.getKey();
LocalResource lr = entry.getValue();
localResourcesBuilder.setName(key);
localResourcesBuilder.setUri(
DagTypeConverters.convertToDAGPlan(lr.getResource()));
localResourcesBuilder.setSize(lr.getSize());
localResourcesBuilder.setTimeStamp(lr.getTimestamp());
localResourcesBuilder.setType(
DagTypeConverters.convertToDAGPlan(lr.getType()));
localResourcesBuilder.setVisibility(
DagTypeConverters.convertToDAGPlan(lr.getVisibility()));
if (lr.getType() == LocalResourceType.PATTERN) {
if (lr.getPattern() == null || lr.getPattern().isEmpty()) {
throw new TezUncheckedException("LocalResource type set to pattern"
+ " but pattern is null or empty");
}
localResourcesBuilder.setPattern(lr.getPattern());
}
taskConfigBuilder.addLocalResource(localResourcesBuilder);
}
for (String key : vertex.getTaskEnvironment().keySet()) {
PlanKeyValuePair.Builder envSettingBuilder = PlanKeyValuePair.newBuilder();
envSettingBuilder.setKey(key);
envSettingBuilder.setValue(vertex.getTaskEnvironment().get(key));
taskConfigBuilder.addEnvironmentSetting(envSettingBuilder);
}
if (vertex.getTaskLocationsHint() != null) {
if (vertex.getTaskLocationsHint().getTaskLocationHints() != null) {
for (TaskLocationHint hint : vertex.getTaskLocationsHint().getTaskLocationHints()) {
PlanTaskLocationHint.Builder taskLocationHintBuilder = PlanTaskLocationHint.newBuilder();
if (hint.getAffinitizedContainer() != null) {
throw new TezUncheckedException(
"Container affinity may not be specified via the DAG API");
}
if (hint.getDataLocalHosts() != null) {
taskLocationHintBuilder.addAllHost(hint.getDataLocalHosts());
}
if (hint.getRacks() != null) {
taskLocationHintBuilder.addAllRack(hint.getRacks());
}
vertexBuilder.addTaskLocationHint(taskLocationHintBuilder);
}
}
}
if (vertex.getVertexManagerPlugin() != null) {
vertexBuilder.setVertexManagerPlugin(DagTypeConverters
.convertToDAGPlan(vertex.getVertexManagerPlugin()));
}
for (Edge inEdge : vertex.getInputEdges()) {
vertexBuilder.addInEdgeId(inEdge.getId());
}
for (Edge outEdge : vertex.getOutputEdges()) {
vertexBuilder.addOutEdgeId(outEdge.getId());
}
vertexBuilder.setTaskConfig(taskConfigBuilder);
dagBuilder.addVertex(vertexBuilder);
}
for (Edge edge : edges) {
EdgePlan.Builder edgeBuilder = EdgePlan.newBuilder();
edgeBuilder.setId(edge.getId());
edgeBuilder.setInputVertexName(edge.getInputVertex().getName());
edgeBuilder.setOutputVertexName(edge.getOutputVertex().getName());
edgeBuilder.setDataMovementType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataMovementType()));
edgeBuilder.setDataSourceType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getDataSourceType()));
edgeBuilder.setSchedulingType(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getSchedulingType()));
edgeBuilder.setEdgeSource(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeSource()));
edgeBuilder.setEdgeDestination(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeDestination()));
if (edge.getEdgeProperty().getDataMovementType() == DataMovementType.CUSTOM) {
if (edge.getEdgeProperty().getEdgeManagerDescriptor() != null) {
edgeBuilder.setEdgeManager(DagTypeConverters.convertToDAGPlan(edge.getEdgeProperty().getEdgeManagerDescriptor()));
} // else the AM will deal with this.
}
dagBuilder.addEdge(edgeBuilder);
}
if (dagConf != null) {
Iterator<Entry<String, String>> iter = dagConf.iterator();
ConfigurationProto.Builder confProtoBuilder =
ConfigurationProto.newBuilder();
while (iter.hasNext()) {
Entry<String, String> entry = iter.next();
PlanKeyValuePair.Builder kvp = PlanKeyValuePair.newBuilder();
kvp.setKey(entry.getKey());
kvp.setValue(entry.getValue());
confProtoBuilder.addConfKeyValues(kvp);
}
dagBuilder.setDagKeyValues(confProtoBuilder);
}
if (credentials != null) {
dagBuilder.setCredentialsBinary(DagTypeConverters.convertCredentialsToProto(credentials));
LogUtils.logCredentials(LOG, credentials, "dag");
}
return dagBuilder.build();
}