in src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java [110:304]
public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
synchronized (this) {
if (executor == null) {
executor = Executors.newSingleThreadExecutor(namedThreadFactory);
}
}
if (pc.getExecType().isLocal()) {
pc.getProperties().setProperty(TezConfiguration.TEZ_LOCAL_MODE, "true");
pc.getProperties().setProperty(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "true");
pc.getProperties().setProperty(TezConfiguration.TEZ_IGNORE_LIB_URIS, "true");
pc.getProperties().setProperty(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, DAGSchedulerNaturalOrderControlled.class.getName());
}
Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
// Make sure MR counter does not exceed limit
if (conf.get(TezConfiguration.TEZ_COUNTERS_MAX) != null) {
conf.setInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTERS_MAX_KEY, Math.max(
conf.getInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTERS_MAX_KEY, 0),
conf.getInt(TezConfiguration.TEZ_COUNTERS_MAX, 0)));
}
if (conf.get(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS) != null) {
conf.setInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTER_GROUPS_MAX_KEY, Math.max(
conf.getInt(org.apache.hadoop.mapreduce.MRJobConfig.COUNTER_GROUPS_MAX_KEY, 0),
conf.getInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 0)));
}
// This is hacky, but Limits cannot be initialized twice
try {
Field f = Limits.class.getDeclaredField("isInited");
f.setAccessible(true);
f.setBoolean(null, false);
Limits.init(conf);
} catch (Throwable e) {
log.warn("Error when setting counter limit: " + e.getMessage());
}
if (pc.defaultParallel == -1 && !conf.getBoolean(PigConfiguration.PIG_TEZ_AUTO_PARALLELISM, true)) {
pc.defaultParallel = 1;
}
aggregateWarning = conf.getBoolean("aggregate.warning", false);
TezResourceManager tezResourceManager = TezResourceManager.getInstance();
tezResourceManager.init(pc, conf);
String stagingDir = conf.get(TezConfiguration.TEZ_AM_STAGING_DIR);
String resourcesDir = tezResourceManager.getResourcesDir().toString();
if (stagingDir == null) {
// If not set in tez-site.xml, use Pig's tez resources directory as staging directory
// instead of TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT
stagingDir = resourcesDir;
conf.set(TezConfiguration.TEZ_AM_STAGING_DIR, resourcesDir);
}
log.info("Tez staging directory is " + stagingDir + " and resources directory is " + resourcesDir);
List<TezOperPlan> processedPlans = new ArrayList<TezOperPlan>();
tezScriptState = TezScriptState.get();
tezStats = new TezPigScriptStats(pc);
PigStats.start(tezStats);
conf.setIfUnset(TezConfiguration.TEZ_USE_CLUSTER_HADOOP_LIBS, "true");
TezJobCompiler jc = new TezJobCompiler(pc, conf);
TezPlanContainer tezPlanContainer = compile(php, pc);
tezStats.initialize(tezPlanContainer);
tezScriptState.emitInitialPlanNotification(tezPlanContainer);
tezScriptState.emitLaunchStartedNotification(tezPlanContainer.size()); //number of DAGs to Launch
boolean stop_on_failure =
Boolean.valueOf(pc.getProperties().getProperty("stop.on.failure", "false"));
boolean stoppedOnFailure = false;
TezPlanContainerNode tezPlanContainerNode;
TezOperPlan tezPlan;
int processedDAGs = 0;
while ((tezPlanContainerNode = tezPlanContainer.getNextPlan(processedPlans)) != null) {
tezPlan = tezPlanContainerNode.getTezOperPlan();
processLoadAndParallelism(tezPlan, pc);
processedPlans.add(tezPlan);
ProgressReporter reporter = new ProgressReporter(tezPlanContainer.size(), processedDAGs);
if (tezPlan.size()==1 && tezPlan.getRoots().get(0) instanceof NativeTezOper) {
// Native Tez Plan
NativeTezOper nativeOper = (NativeTezOper)tezPlan.getRoots().get(0);
tezScriptState.emitJobsSubmittedNotification(1);
nativeOper.runJob(tezPlanContainerNode.getOperatorKey().toString());
} else {
TezPOPackageAnnotator pkgAnnotator = new TezPOPackageAnnotator(tezPlan);
pkgAnnotator.visit();
runningJob = jc.compile(tezPlanContainerNode, tezPlanContainer);
//TODO: Exclude vertex groups from numVerticesToLaunch ??
tezScriptState.dagLaunchNotification(runningJob.getName(), tezPlan, tezPlan.size());
runningJob.setPigStats(tezStats);
// Set the thread UDFContext so registered classes are available.
final UDFContext udfContext = UDFContext.getUDFContext();
Runnable task = new Runnable() {
@Override
public void run() {
Thread.currentThread().setContextClassLoader(PigContext.getClassLoader());
UDFContext.setUdfContext(udfContext.clone());
runningJob.run();
}
};
// Mark the times that the jobs were submitted so it's reflected in job
// history props. TODO: Fix this. unused now
long scriptSubmittedTimestamp = System.currentTimeMillis();
// Job.getConfiguration returns the shared configuration object
Configuration jobConf = runningJob.getConfiguration();
jobConf.set("pig.script.submitted.timestamp",
Long.toString(scriptSubmittedTimestamp));
jobConf.set("pig.job.submitted.timestamp",
Long.toString(System.currentTimeMillis()));
Future<?> future = executor.submit(task);
tezScriptState.emitJobsSubmittedNotification(1);
boolean jobStarted = false;
while (!future.isDone()) {
if (!jobStarted && runningJob.getApplicationId() != null) {
jobStarted = true;
String appId = runningJob.getApplicationId().toString();
//For Oozie Pig action job id matching compatibility with MR mode
log.info("HadoopJobId: "+ appId.replace("application", "job"));
tezScriptState.emitJobStartedNotification(appId);
tezScriptState.dagStartedNotification(runningJob.getName(), appId);
}
reporter.notifyUpdate();
Thread.sleep(1000);
}
// For tez_local mode where PigProcessor destroys all UDFContext
UDFContext.setUdfContext(udfContext);
try {
// In case of FutureTask there is no uncaught exception
// Need to do future.get() to get any exception
future.get();
} catch (ExecutionException e) {
setJobException(e.getCause());
}
}
processedDAGs++;
if (tezPlanContainer.size() == processedDAGs) {
tezScriptState.emitProgressUpdatedNotification(100);
} else {
tezScriptState.emitProgressUpdatedNotification(
((tezPlanContainer.size() - processedDAGs)/tezPlanContainer.size()) * 100);
}
handleUnCaughtException(pc);
boolean tezDAGSucceeded = reporter.notifyFinishedOrFailed();
tezPlanContainer.updatePlan(tezPlan, tezDAGSucceeded);
// if stop_on_failure is enabled, we need to stop immediately when any job has failed
if (!tezDAGSucceeded) {
if (stop_on_failure) {
stoppedOnFailure = true;
break;
} else {
log.warn("Ooops! Some job has failed! Specify -stop_on_failure if you "
+ "want Pig to stop immediately on failure.");
}
}
}
tezStats.finish();
tezScriptState.emitLaunchCompletedNotification(tezStats.getNumberSuccessfulJobs());
for (OutputStats output : tezStats.getOutputStats()) {
POStore store = output.getPOStore();
try {
if (!output.isSuccessful()) {
store.getStoreFunc().cleanupOnFailure(
store.getSFile().getFileName(),
Job.getInstance(output.getConf()));
} else {
store.getStoreFunc().cleanupOnSuccess(
store.getSFile().getFileName(),
Job.getInstance(output.getConf()));
}
} catch (IOException e) {
throw new ExecException(e);
} catch (AbstractMethodError nsme) {
// Just swallow it. This means we're running against an
// older instance of a StoreFunc that doesn't implement
// this method.
}
}
if (stoppedOnFailure) {
throw new ExecException("Stopping execution on job failure with -stop_on_failure option", 6017,
PigException.REMOTE_ENVIRONMENT);
}
return tezStats;
}