protected void setJobConf()

in hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamJob.java [734:985]


  protected void setJobConf() throws IOException {
    if (additionalConfSpec_ != null) {
      LOG.warn("-additionalconfspec option is deprecated, please use -conf instead.");
      config_.addResource(new Path(additionalConfSpec_));
    }

    // general MapRed job properties
    jobConf_ = new JobConf(config_, StreamJob.class);

    // All streaming jobs get the task timeout value
    // from the configuration settings.

    // The correct FS must be set before this is called!
    // (to resolve local vs. dfs drive letter differences)
    // (mapreduce.job.working.dir will be lazily initialized ONCE and depends on FS)
    for (int i = 0; i < inputSpecs_.size(); i++) {
      FileInputFormat.addInputPaths(jobConf_,
                        (String) inputSpecs_.get(i));
    }

    String defaultPackage = this.getClass().getPackage().getName();
    Class c;
    Class fmt = null;
    if (inReaderSpec_ == null && inputFormatSpec_ == null) {
      fmt = TextInputFormat.class;
    } else if (inputFormatSpec_ != null) {
      if (inputFormatSpec_.equals(TextInputFormat.class.getName())
          || inputFormatSpec_.equals(TextInputFormat.class.getCanonicalName())
          || inputFormatSpec_.equals(TextInputFormat.class.getSimpleName())) {
        fmt = TextInputFormat.class;
      } else if (inputFormatSpec_.equals(KeyValueTextInputFormat.class
          .getName())
          || inputFormatSpec_.equals(KeyValueTextInputFormat.class
              .getCanonicalName())
          || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getSimpleName())) {
        if (inReaderSpec_ == null) {
          fmt = KeyValueTextInputFormat.class;
        }
      } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class
          .getName())
          || inputFormatSpec_
              .equals(org.apache.hadoop.mapred.SequenceFileInputFormat.class
                  .getCanonicalName())
          || inputFormatSpec_
              .equals(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getSimpleName())) {
        if (inReaderSpec_ == null) {
          fmt = SequenceFileInputFormat.class;
        }
      } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
          .getName())
          || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
              .getCanonicalName())
          || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getSimpleName())) {
        fmt = SequenceFileAsTextInputFormat.class;
      } else {
        c = StreamUtil.goodClassOrNull(jobConf_, inputFormatSpec_, defaultPackage);
        if (c != null) {
          fmt = c;
        } else {
          fail("-inputformat : class not found : " + inputFormatSpec_);
        }
      }
    }
    if (fmt == null) {
      fmt = StreamInputFormat.class;
    }

    jobConf_.setInputFormat(fmt);

    if (ioSpec_ != null) {
      jobConf_.set("stream.map.input", ioSpec_);
      jobConf_.set("stream.map.output", ioSpec_);
      jobConf_.set("stream.reduce.input", ioSpec_);
      jobConf_.set("stream.reduce.output", ioSpec_);
    }

    Class<? extends IdentifierResolver> idResolverClass =
      jobConf_.getClass("stream.io.identifier.resolver.class",
        IdentifierResolver.class, IdentifierResolver.class);
    IdentifierResolver idResolver = ReflectionUtils.newInstance(idResolverClass, jobConf_);

    idResolver.resolve(jobConf_.get("stream.map.input", IdentifierResolver.TEXT_ID));
    jobConf_.setClass("stream.map.input.writer.class",
      idResolver.getInputWriterClass(), InputWriter.class);

    idResolver.resolve(jobConf_.get("stream.reduce.input", IdentifierResolver.TEXT_ID));
    jobConf_.setClass("stream.reduce.input.writer.class",
      idResolver.getInputWriterClass(), InputWriter.class);

    jobConf_.set("stream.addenvironment", addTaskEnvironment_);

    boolean isMapperACommand = false;
    if (mapCmd_ != null) {
      c = StreamUtil.goodClassOrNull(jobConf_, mapCmd_, defaultPackage);
      if (c != null) {
        jobConf_.setMapperClass(c);
      } else {
        isMapperACommand = true;
        jobConf_.setMapperClass(PipeMapper.class);
        jobConf_.setMapRunnerClass(PipeMapRunner.class);
        jobConf_.set("stream.map.streamprocessor",
                     URLEncoder.encode(mapCmd_, "UTF-8"));
      }
    }

    if (comCmd_ != null) {
      c = StreamUtil.goodClassOrNull(jobConf_, comCmd_, defaultPackage);
      if (c != null) {
        jobConf_.setCombinerClass(c);
      } else {
        jobConf_.setCombinerClass(PipeCombiner.class);
        jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(
                comCmd_, "UTF-8"));
      }
    }

    if (numReduceTasksSpec_!= null) {
      int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
      jobConf_.setNumReduceTasks(numReduceTasks);
    }

    boolean isReducerACommand = false;
    if (redCmd_ != null) {
      if (redCmd_.equals(REDUCE_NONE)) {
        jobConf_.setNumReduceTasks(0);
      }
      if (jobConf_.getNumReduceTasks() != 0) {
        if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
          jobConf_.setReducerClass(ValueAggregatorReducer.class);
          jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
        } else {

          c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
          if (c != null) {
            jobConf_.setReducerClass(c);
          } else {
            isReducerACommand = true;
            jobConf_.setReducerClass(PipeReducer.class);
            jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
                redCmd_, "UTF-8"));
          }
        }
      }
    }

    idResolver.resolve(jobConf_.get("stream.map.output",
        IdentifierResolver.TEXT_ID));
    jobConf_.setClass("stream.map.output.reader.class",
      idResolver.getOutputReaderClass(), OutputReader.class);
    if (isMapperACommand || jobConf_.get("stream.map.output") != null) {
      // if mapper is a command, then map output key/value classes come from the
      // idResolver
      jobConf_.setMapOutputKeyClass(idResolver.getOutputKeyClass());
      jobConf_.setMapOutputValueClass(idResolver.getOutputValueClass());

      if (jobConf_.getNumReduceTasks() == 0) {
        jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
        jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
      }
    }

    idResolver.resolve(jobConf_.get("stream.reduce.output",
        IdentifierResolver.TEXT_ID));
    jobConf_.setClass("stream.reduce.output.reader.class",
      idResolver.getOutputReaderClass(), OutputReader.class);
    if (isReducerACommand || jobConf_.get("stream.reduce.output") != null) {
      // if reducer is a command, then output key/value classes come from the
      // idResolver
      jobConf_.setOutputKeyClass(idResolver.getOutputKeyClass());
      jobConf_.setOutputValueClass(idResolver.getOutputValueClass());
    }

    if (inReaderSpec_ != null) {
      String[] args = inReaderSpec_.split(",");
      String readerClass = args[0];
      // this argument can only be a Java class
      c = StreamUtil.goodClassOrNull(jobConf_, readerClass, defaultPackage);
      if (c != null) {
        jobConf_.set("stream.recordreader.class", c.getName());
      } else {
        fail("-inputreader: class not found: " + readerClass);
      }
      for (int i = 1; i < args.length; i++) {
        String[] nv = args[i].split("=", 2);
        String k = "stream.recordreader." + nv[0];
        String v = (nv.length > 1) ? nv[1] : "";
        jobConf_.set(k, v);
      }
    }

    FileOutputFormat.setOutputPath(jobConf_, new Path(output_));
    fmt = null;
    if (outputFormatSpec_!= null) {
      c = StreamUtil.goodClassOrNull(jobConf_, outputFormatSpec_, defaultPackage);
      if (c != null) {
        fmt = c;
      } else {
        fail("-outputformat : class not found : " + outputFormatSpec_);
      }
    }
    if (fmt == null) {
      fmt = TextOutputFormat.class;
    }
    if (lazyOutput_) {
      LazyOutputFormat.setOutputFormatClass(jobConf_, fmt);
    } else {
      jobConf_.setOutputFormat(fmt);
    }

    if (partitionerSpec_!= null) {
      c = StreamUtil.goodClassOrNull(jobConf_, partitionerSpec_, defaultPackage);
      if (c != null) {
        jobConf_.setPartitionerClass(c);
      } else {
        fail("-partitioner : class not found : " + partitionerSpec_);
      }
    }

    if(mapDebugSpec_ != null){
    	jobConf_.setMapDebugScript(mapDebugSpec_);
    }
    if(reduceDebugSpec_ != null){
    	jobConf_.setReduceDebugScript(reduceDebugSpec_);
    }
    // last, allow user to override anything
    // (although typically used with properties we didn't touch)

    jar_ = packageJobJar();
    if (jar_ != null) {
      jobConf_.setJar(jar_);
    }

    if ((cacheArchives != null) || (cacheFiles != null)){
      getURIs(cacheArchives, cacheFiles);
      boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
      if (!b)
        fail(LINK_URI);
    }
    // set the jobconf for the caching parameters
    if (cacheArchives != null) {
      Job.setCacheArchives(archiveURIs, jobConf_);
    }
    if (cacheFiles != null) {
      Job.setCacheFiles(fileURIs, jobConf_);
    }

    if (verbose_) {
      listJobConfProperties();
    }

    msg("submitting to jobconf: " + getJobTrackerHostPort());
  }