public int execute()

in ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java [221:490]


  public int execute() {

    IOPrepareCache ioPrepareCache = IOPrepareCache.get();
    ioPrepareCache.clear();

    boolean success = true;

    boolean ctxCreated = false;
    Path emptyScratchDir;
    JobClient jc = null;

    if (taskQueue.isShutdown()) {
      LOG.warn("Task was cancelled");
      return 5;
    }

    MapWork mWork = work.getMapWork();
    ReduceWork rWork = work.getReduceWork();

    Context ctx = context;
    try {
      if (ctx == null) {
        ctx = new Context(job);
        ctxCreated = true;
      }

      emptyScratchDir = ctx.getMRTmpPath();
      FileSystem fs = emptyScratchDir.getFileSystem(job);
      fs.mkdirs(emptyScratchDir);
    } catch (IOException e) {
      console.printError("Error launching map-reduce job", "\n"
          + org.apache.hadoop.util.StringUtils.stringifyException(e));
      return 5;
    }

    HiveFileFormatUtils.prepareJobOutput(job);
    //See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput()
    job.setOutputFormat(HiveOutputFormatImpl.class);

    job.setMapRunnerClass(ExecMapRunner.class);
    job.setMapperClass(ExecMapper.class);

    job.setMapOutputKeyClass(HiveKey.class);
    job.setMapOutputValueClass(BytesWritable.class);

    try {
      String partitioner = HiveConf.getVar(job, ConfVars.HIVE_PARTITIONER);
      job.setPartitionerClass(JavaUtils.loadClass(partitioner));
    } catch (ClassNotFoundException e) {
      throw new RuntimeException(e.getMessage(), e);
    }

    propagateSplitSettings(job, mWork);

    job.setNumReduceTasks(rWork != null ? rWork.getNumReduceTasks() : 0);
    job.setReducerClass(ExecReducer.class);

    // set input format information if necessary
    setInputAttributes(job);

    // HIVE-23354 enforces that MR speculative execution is disabled
    job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
    job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);

    String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVE_INPUT_FORMAT);

    if (mWork.isUseBucketizedHiveInputFormat()) {
      inpFormat = BucketizedHiveInputFormat.class.getName();
    }

    LOG.info("Using " + inpFormat);

    try {
      job.setInputFormat(JavaUtils.loadClass(inpFormat));
    } catch (ClassNotFoundException e) {
      throw new RuntimeException(e.getMessage(), e);
    }

    // No-Op - we don't really write anything here ..
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    int returnVal = 0;
    boolean noName = StringUtils.isEmpty(job.get(MRJobConfig.JOB_NAME));

    if (noName) {
      // This is for a special case to ensure unit tests pass
      job.set(MRJobConfig.JOB_NAME,
          "JOB" + ThreadLocalRandom.current().nextInt());
    }

    try{
      MapredLocalWork localwork = mWork.getMapRedLocalWork();
      if (localwork != null && localwork.hasStagedAlias()) {
        if (!ShimLoader.getHadoopShims().isLocalMode(job)) {
          Path localPath = localwork.getTmpPath();
          Path hdfsPath = mWork.getTmpHDFSPath();

          FileSystem hdfs = hdfsPath.getFileSystem(job);
          FileSystem localFS = localPath.getFileSystem(job);
          FileStatus[] hashtableFiles = localFS.listStatus(localPath);
          int fileNumber = hashtableFiles.length;
          String[] fileNames = new String[fileNumber];

          for ( int i = 0; i < fileNumber; i++){
            fileNames[i] = hashtableFiles[i].getPath().getName();
          }

          //package and compress all the hashtable files to an archive file
          String stageId = this.getId();
          String archiveFileName = Utilities.generateTarFileName(stageId);
          localwork.setStageID(stageId);

          CompressionUtils.tar(localPath.toUri().getPath(), fileNames,archiveFileName);
          Path archivePath = Utilities.generateTarPath(localPath, stageId);
          LOG.info("Archive "+ hashtableFiles.length+" hash table files to " + archivePath);

          //upload archive file to hdfs
          Path hdfsFilePath =Utilities.generateTarPath(hdfsPath, stageId);
          short replication = (short) job.getInt("mapred.submit.replication", 10);
          hdfs.copyFromLocalFile(archivePath, hdfsFilePath);
          hdfs.setReplication(hdfsFilePath, replication);
          LOG.info("Upload 1 archive file  from" + archivePath + " to: " + hdfsFilePath);

          //add the archive file to distributed cache
          DistributedCache.createSymlink(job);
          DistributedCache.addCacheArchive(hdfsFilePath.toUri(), job);
          LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri());
        }
      }
      work.configureJobConf(job);
      List<Path> inputPaths = Utilities.getInputPaths(job, mWork, emptyScratchDir, ctx, false);
      Utilities.setInputPaths(job, inputPaths);

      Utilities.setMapRedWork(job, work, ctx.getMRTmpPath());

      if (mWork.getSamplingType() > 0 && rWork != null && job.getNumReduceTasks() > 1) {
        try {
          handleSampling(ctx, mWork, job);
          job.setPartitionerClass(HiveTotalOrderPartitioner.class);
        } catch (IllegalStateException e) {
          console.printInfo("Not enough sampling data.. Rolling back to single reducer task");
          rWork.setNumReduceTasks(1);
          job.setNumReduceTasks(1);
        } catch (Exception e) {
          LOG.error("Sampling error", e);
          console.printError(e.toString(),
              "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
          rWork.setNumReduceTasks(1);
          job.setNumReduceTasks(1);
        }
      }

      jc = new JobClient(job);
      // make this client wait if job tracker is not behaving well.
      Throttle.checkJobTracker(job, LOG);

      if (mWork.isGatheringStats() || (rWork != null && rWork.isGatheringStats())) {
        // initialize stats publishing table
        StatsPublisher statsPublisher;
        StatsFactory factory = StatsFactory.newFactory(job);
        if (factory != null) {
          statsPublisher = factory.getStatsPublisher();
          List<String> statsTmpDir = Utilities.getStatsTmpDirs(mWork, job);
          if (rWork != null) {
            statsTmpDir.addAll(Utilities.getStatsTmpDirs(rWork, job));
          }
          StatsCollectionContext sc = new StatsCollectionContext(job);
          sc.setStatsTmpDirs(statsTmpDir);
          if (!statsPublisher.init(sc)) { // creating stats table if not exists
            if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_STATS_RELIABLE)) {
              throw
                new HiveException(ErrorMsg.STATSPUBLISHER_INITIALIZATION_ERROR.getErrorCodedMsg());
            }
          }
        }
      }

      Utilities.createTmpDirs(job, mWork);
      Utilities.createTmpDirs(job, rWork);

      SessionState ss = SessionState.get();
      // TODO: why is there a TezSession in MR ExecDriver?
      if (ss != null && HiveConf.getVar(job, ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
        // TODO: this is the only place that uses keepTmpDir. Why?
        TezSessionPoolManager.closeIfNotDefault(ss.getTezSession(), true);
      }

      HiveConfUtil.updateJobCredentialProviders(job);
      // Finally SUBMIT the JOB!
      if (taskQueue.isShutdown()) {
        LOG.warn("Task was cancelled");
        return 5;
      }

    rj = jc.submitJob(job);

      if (taskQueue.isShutdown()) {
        LOG.warn("Task was cancelled");
        killJob();
        return 5;
      }

      this.jobID = rj.getJobID();
      updateStatusInQueryDisplay();
      returnVal = jobExecHelper.progress(rj, jc, ctx);
      success = (returnVal == 0);
    } catch (Exception e) {
      setException(e);
      String mesg = " with exception '" + Utilities.getNameMessage(e) + "'";
      if (rj != null) {
        mesg = "Ended Job = " + rj.getJobID() + mesg;
      } else {
        mesg = "Job Submission failed" + mesg;
      }

      // Has to use full name to make sure it does not conflict with
      // org.apache.commons.lang3.StringUtils
      console.printError(mesg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));

      success = false;
      returnVal = 1;
    } finally {
      Utilities.clearWork(job);
      try {
        if (ctxCreated) {
          ctx.clear();
        }

        if (rj != null) {
          if (returnVal != 0) {
            killJob();
          }
          jobID = rj.getID().toString();
        }
        if (jc!=null) {
          jc.close();
        }
      } catch (Exception e) {
        LOG.warn("Failed while cleaning up ", e);
      } finally {
        HadoopJobExecHelper.runningJobs.remove(rj);
      }
    }

    // get the list of Dynamic partition paths
    try {
      if (rj != null) {
        if (mWork.getAliasToWork() != null) {
          for (Operator<? extends OperatorDesc> op : mWork.getAliasToWork().values()) {
            op.jobClose(job, success);
          }
        }
        if (rWork != null) {
          rWork.getReducer().jobClose(job, success);
        }
      }
    } catch (Exception e) {
      // jobClose needs to execute successfully otherwise fail task
      if (success) {
        setException(e);
        success = false;
        returnVal = 3;
        String mesg = "Job Commit failed with exception '" + Utilities.getNameMessage(e) + "'";
        console.printError(mesg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
      }
    }

    return (returnVal);
  }