public ContainerExecutionResult run()

in tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezChild.java [215:333]


  public ContainerExecutionResult run() throws IOException, InterruptedException, TezException {

    ContainerContext containerContext = new ContainerContext(containerIdString);
    ContainerReporter containerReporter = new ContainerReporter(umbilical, containerContext,
        getTaskMaxSleepTime);

    taskReporter = new TaskReporter(umbilical, amHeartbeatInterval,
        sendCounterInterval, maxEventsToGet, heartbeatCounter, containerIdString);

    UserGroupInformation childUGI = null;

    while (!executor.isTerminated() && !isShutdown.get()) {
      if (taskCount > 0) {
        TezUtilsInternal.updateLoggers(defaultConf, "", LoggingUtils.getPatternForTask(defaultConf));
      }
      ListenableFuture<ContainerTask> getTaskFuture = executor.submit(containerReporter);
      boolean error = false;
      ContainerTask containerTask = null;
      try {
        containerTask = getTaskFuture.get();
      } catch (ExecutionException e) {
        error = true;
        Throwable cause = e.getCause();
        LOG.error("Error fetching new work for container {}", containerIdString,
            cause);
        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
            cause, "Execution Exception while fetching new work: " + e.getMessage());
      } catch (InterruptedException e) {
        error = true;
        LOG.info("Interrupted while waiting for new work for container {}", containerIdString);
        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.INTERRUPTED, e,
            "Interrupted while waiting for new work");
      } finally {
        if (error) {
          shutdown();
        }
      }

      TezCommonUtils.logCredentials(LOG, containerTask.getCredentials(), "containerTask");
      if (containerTask.shouldDie()) {
        LOG.info("ContainerTask returned shouldDie=true for container {}, Exiting", containerIdString);
        shutdown();
        return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
            "Asked to die by the AM");
      } else {
        TezTaskAttemptID attemptId = containerTask.getTaskSpec().getTaskAttemptID();
        Configuration taskConf;
        if (containerTask.getTaskSpec().getTaskConf() != null) {
          Configuration copy = new Configuration(defaultConf);
          TezTaskRunner2.mergeTaskSpecConfToConf(containerTask.getTaskSpec(), copy);
          taskConf = copy;
          LoggingUtils.initLoggingContext(mdcContext, copy, attemptId.getTaskID().getVertexID().getDAGID().toString(),
              attemptId.toString());
        } else {
          taskConf = defaultConf;
          LoggingUtils.initLoggingContext(mdcContext, defaultConf,
              attemptId.getTaskID().getVertexID().getDAGID().toString(), attemptId.toString());
        }
        String loggerAddend = attemptId.toString();
        taskCount++;
        String timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(Calendar.getInstance().getTime());
        System.err.println(timeStamp + " Starting to run new task attempt: " +
            containerTask.getTaskSpec().getTaskAttemptID().toString());
        System.out.println(timeStamp + " Starting to run new task attempt: " +
            containerTask.getTaskSpec().getTaskAttemptID().toString());
        TezUtilsInternal.setHadoopCallerContext(hadoopShim,
            containerTask.getTaskSpec().getTaskAttemptID());
        TezUtilsInternal.updateLoggers(defaultConf, loggerAddend, LoggingUtils.getPatternForTask(defaultConf));

        FileSystem.clearStatistics();

        childUGI = handleNewTaskCredentials(containerTask, childUGI);
        TezCommonUtils.logCredentials(LOG, childUGI.getCredentials(), "taskChildUGI");
        handleNewTaskLocalResources(containerTask, childUGI);
        cleanupOnTaskChanged(containerTask);

        // Execute the Actual Task
        TezTaskRunner2 taskRunner = new TezTaskRunner2(defaultConf, childUGI,
            localDirs, containerTask.getTaskSpec(), appAttemptNumber,
            serviceConsumerMetadata, serviceProviderEnvMap, startedInputsMap, taskReporter,
            executor, objectRegistry, pid, executionContext, memAvailable, updateSysCounters,
            hadoopShim, sharedExecutor);

        boolean shouldDie;
        final String[] hookClasses = taskConf
            .getStrings(TezConfiguration.TEZ_TASK_ATTEMPT_HOOKS, new String[0]);
        final TezTaskAttemptHook[] hooks = new TezTaskAttemptHook[hookClasses.length];
        for (int i = 0; i < hooks.length; i++) {
          hooks[i] = ReflectionUtils.createClazzInstance(hookClasses[i]);
          hooks[i].start(attemptId, taskConf);
        }
        try {
          TaskRunner2Result result = taskRunner.run();
          LOG.info("TaskRunner2Result: {}", result);
          shouldDie = result.isContainerShutdownRequested();
          if (shouldDie) {
            LOG.info("Got a shouldDie notification via heartbeats for container {}. Shutting down", containerIdString);
            shutdown();
            return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
                "Asked to die by the AM");
          }
          if (result.getError() != null) {
            Throwable e = result.getError();
            handleError(result.getError());
            return new ContainerExecutionResult(
                ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE,
                e, "TaskExecutionFailure: " + e.getMessage());
          }
        } finally {
          for (TezTaskAttemptHook hook : hooks) {
            hook.stop();
          }
          FileSystem.closeAllForUGI(childUGI);
        }
      }
    }
    return new ContainerExecutionResult(ContainerExecutionResult.ExitStatus.SUCCESS, null,
        null);
  }