in tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java [980:1174]
public VertexImpl(TezVertexID vertexId, VertexPlan vertexPlan,
String vertexName, Configuration dagConf, EventHandler eventHandler,
TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock,
TaskHeartbeatHandler thh, boolean commitVertexOutputs,
AppContext appContext, VertexLocationHint vertexLocationHint,
Map<String, VertexGroupInfo> dagVertexGroups, TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption,
StateChangeNotifier entityStatusTracker, Configuration dagOnlyConf) {
this.vertexId = vertexId;
this.vertexPlan = vertexPlan;
this.vertexName = StringInterner.intern(vertexName);
this.vertexConf = new Configuration(dagConf);
this.vertexOnlyConf = new Configuration(dagOnlyConf);
if (vertexPlan.hasVertexConf()) {
ConfigurationProto confProto = vertexPlan.getVertexConf();
for (PlanKeyValuePair keyValuePair : confProto.getConfKeyValuesList()) {
TezConfiguration.validateProperty(keyValuePair.getKey(), Scope.VERTEX);
vertexConf.set(keyValuePair.getKey(), keyValuePair.getValue());
vertexOnlyConf.set(keyValuePair.getKey(), keyValuePair.getValue());
}
}
this.vertexContextConfig = new VertexConfigImpl(vertexConf);
this.clock = clock;
this.appContext = appContext;
this.commitVertexOutputs = commitVertexOutputs;
this.logIdentifier = this.getVertexId() + " [" + this.getName() + "]";
this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
this.taskHeartbeatHandler = thh;
this.eventHandler = eventHandler;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
this.readLock = readWriteLock.readLock();
this.writeLock = readWriteLock.writeLock();
if (LOG.isDebugEnabled()) {
logLocationHints(this.vertexName, vertexLocationHint);
}
setTaskLocationHints(vertexLocationHint);
this.dagUgi = appContext.getCurrentDAG().getDagUGI();
this.dag = appContext.getCurrentDAG();
this.taskResource = DagTypeConverters
.createResourceRequestFromTaskConfig(vertexPlan.getTaskConfig());
this.processorDescriptor = DagTypeConverters
.convertProcessorDescriptorFromDAGPlan(vertexPlan
.getProcessorDescriptor());
this.localResources = DagTypeConverters
.createLocalResourceMapFromDAGPlan(vertexPlan.getTaskConfig()
.getLocalResourceList());
this.localResources.putAll(dag.getLocalResources());
this.environment = DagTypeConverters
.createEnvironmentMapFromDAGPlan(vertexPlan.getTaskConfig()
.getEnvironmentSettingList());
this.taskSpecificLaunchCmdOpts = taskSpecificLaunchCmdOption;
this.recoveryData = appContext.getDAGRecoveryData() == null ?
null : appContext.getDAGRecoveryData().getVertexRecoveryData(vertexId);
// Set up log properties, including task specific log properties.
String javaOptsWithoutLoggerMods =
vertexPlan.getTaskConfig().hasJavaOpts() ? vertexPlan.getTaskConfig().getJavaOpts() : null;
String logString = vertexConf.get(TezConfiguration.TEZ_TASK_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL_DEFAULT);
String [] taskLogParams = TezClientUtils.parseLogParams(logString);
this.javaOpts = TezClientUtils.maybeAddDefaultLoggingJavaOpts(taskLogParams[0], javaOptsWithoutLoggerMods);
if (taskSpecificLaunchCmdOpts.hasModifiedLogProperties()) {
String [] taskLogParamsTaskSpecific = taskSpecificLaunchCmdOption.getTaskSpecificLogParams();
this.javaOptsTaskSpecific = TezClientUtils
.maybeAddDefaultLoggingJavaOpts(taskLogParamsTaskSpecific[0], javaOptsWithoutLoggerMods);
environmentTaskSpecific = new HashMap<String, String>(this.environment.size());
environmentTaskSpecific.putAll(environment);
if (taskLogParamsTaskSpecific.length == 2 && !Strings.isNullOrEmpty(taskLogParamsTaskSpecific[1])) {
TezClientUtils.addLogParamsToEnv(environmentTaskSpecific, taskLogParamsTaskSpecific);
}
} else {
this.javaOptsTaskSpecific = null;
this.environmentTaskSpecific = null;
}
// env for tasks which don't have task-specific configuration. Has to be set up later to
// optionally allow copying this for specific tasks
TezClientUtils.addLogParamsToEnv(this.environment, taskLogParams);
this.containerContext = new ContainerContext(this.localResources,
appContext.getCurrentDAG().getCredentials(), this.environment, this.javaOpts, this);
LOG.info("Default container context for " + logIdentifier + "=" + containerContext + ", Default Resources=" + this.taskResource);
if (vertexPlan.getInputsCount() > 0) {
setAdditionalInputs(vertexPlan.getInputsList());
}
if (vertexPlan.getOutputsCount() > 0) {
setAdditionalOutputs(vertexPlan.getOutputsList());
}
this.stateChangeNotifier = entityStatusTracker;
// Setup the initial parallelism early. This may be changed after
// initialization or on a setParallelism call.
this.numTasks = vertexPlan.getTaskConfig().getNumTasks();
// Not sending the notifier a parallelism update since this is the initial parallelism
this.dagVertexGroups = dagVertexGroups;
isSpeculationEnabled =
vertexConf.getBoolean(TezConfiguration.TEZ_AM_SPECULATION_ENABLED,
TezConfiguration.TEZ_AM_SPECULATION_ENABLED_DEFAULT);
servicesInited = new AtomicBoolean(false);
initServices();
maxFailuresPercent = vertexConf.getFloat(TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT,
TezConfiguration.TEZ_VERTEX_FAILURES_MAXPERCENT_DEFAULT);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
boolean isLocal = vertexConf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
String tezDefaultComponentName =
isLocal ? TezConstants.getTezUberServicePluginName() :
TezConstants.getTezYarnServicePluginName();
org.apache.tez.dag.api.Vertex.VertexExecutionContext execContext = dag.getDefaultExecutionContext();
if (vertexPlan.hasExecutionContext()) {
execContext = DagTypeConverters.convertFromProto(vertexPlan.getExecutionContext());
LOG.info("Using ExecutionContext from Vertex for Vertex {}", vertexName);
} else if (execContext != null) {
LOG.info("Using ExecutionContext from DAG for Vertex {}", vertexName);
}
if (execContext != null) {
if (execContext.shouldExecuteInAm()) {
tezDefaultComponentName = TezConstants.getTezUberServicePluginName();
}
}
String taskSchedulerName = tezDefaultComponentName;
String containerLauncherName = tezDefaultComponentName;
String taskCommName = tezDefaultComponentName;
if (execContext != null) {
if (execContext.getTaskSchedulerName() != null) {
taskSchedulerName = execContext.getTaskSchedulerName();
}
if (execContext.getContainerLauncherName() != null) {
containerLauncherName = execContext.getContainerLauncherName();
}
if (execContext.getTaskCommName() != null) {
taskCommName = execContext.getTaskCommName();
}
}
try {
taskSchedulerIdentifier = appContext.getTaskScheduerIdentifier(taskSchedulerName);
} catch (Exception e) {
LOG.error("Failed to get index for taskScheduler: " + taskSchedulerName);
throw e;
}
try {
taskCommunicatorIdentifier = appContext.getTaskCommunicatorIdentifier(taskCommName);
} catch (Exception e) {
LOG.error("Failed to get index for taskCommunicator: " + taskCommName);
throw e;
}
try {
containerLauncherIdentifier =
appContext.getContainerLauncherIdentifier(containerLauncherName);
} catch (Exception e) {
LOG.error("Failed to get index for containerLauncher: " + containerLauncherName);
throw e;
}
this.servicePluginInfo = new ServicePluginInfo()
.setContainerLauncherName(
appContext.getContainerLauncherName(this.containerLauncherIdentifier))
.setTaskSchedulerName(appContext.getTaskSchedulerName(this.taskSchedulerIdentifier))
.setTaskCommunicatorName(appContext.getTaskCommunicatorName(this.taskCommunicatorIdentifier))
.setContainerLauncherClassName(
appContext.getContainerLauncherClassName(this.containerLauncherIdentifier))
.setTaskSchedulerClassName(
appContext.getTaskSchedulerClassName(this.taskSchedulerIdentifier))
.setTaskCommunicatorClassName(
appContext.getTaskCommunicatorClassName(this.taskCommunicatorIdentifier));
StringBuilder sb = new StringBuilder();
sb.append("Running vertex: ").append(logIdentifier).append(" : ")
.append("TaskScheduler=").append(taskSchedulerIdentifier).append(":").append(taskSchedulerName)
.append(", ContainerLauncher=").append(containerLauncherIdentifier).append(":").append(containerLauncherName)
.append(", TaskCommunicator=").append(taskCommunicatorIdentifier).append(":").append(taskCommName);
LOG.info(sb.toString());
cleanupShuffleDataAtVertexLevel = vertexConf.getInt(TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT,
TezConfiguration.TEZ_AM_VERTEX_CLEANUP_HEIGHT_DEFAULT) > 0 &&
ShuffleUtils.isTezShuffleHandler(vertexConf);
stateMachine = new StateMachineTez<VertexState, VertexEventType, VertexEvent, VertexImpl>(
stateMachineFactory.make(this), this);
augmentStateMachine();
}