protected void setJobConf()

in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/streaming/StreamJob.java [590:789]


  protected void setJobConf() throws IOException {

    // general MapRed job properties
    jobConf_ = new JobConf(config_);

    // All streaming jobs get the task timeout value
    // from the configuration settings.

    for (int i = 0; i < inputSpecs_.size(); i++) {
      InputUtils.addTable(inputSpecs_.get(i), jobConf_);
    }

    String defaultPackage = this.getClass().getPackage().getName();

    if (ioSpec_ != null) {
      jobConf_.set("stream.map.input", ioSpec_);
      jobConf_.set("stream.map.output", ioSpec_);
      jobConf_.set("stream.reduce.input", ioSpec_);
      jobConf_.set("stream.reduce.output", ioSpec_);
    }

    //Class<? extends IdentifierResolver> idResolverClass =
    //  jobConf_.getClass("stream.io.identifier.resolver.class",
    //    IdentifierResolver.class, IdentifierResolver.class);
    //IdentifierResolver idResolver = ReflectionUtils.newInstance(idResolverClass, jobConf_);

    //idResolver.resolve(jobConf_.get("stream.map.input", IdentifierResolver.TEXT_ID));
    //jobConf_.setClass("stream.map.input.writer.class",
    //  idResolver.getInputWriterClass(), InputWriter.class);
    jobConf_.setClass("stream.map.input.writer.class", RecordInputWriter.class, InputWriter.class);

    //idResolver.resolve(jobConf_.get("stream.reduce.input", IdentifierResolver.TEXT_ID));
    //jobConf_.setClass("stream.reduce.input.writer.class",
    //  idResolver.getInputWriterClass(), InputWriter.class);
    jobConf_.setClass("stream.reduce.input.writer.class", TextInputWriter.class, InputWriter.class);

    jobConf_.set("stream.addenvironment", addTaskEnvironment_);

    boolean isMapperACommand = false;
    Class c = null;
    if (mapCmd_ != null) {
      c = StreamUtil.goodClassOrNull(jobConf_, mapCmd_, defaultPackage);
      if (c != null) {
        jobConf_.setMapperClass(c);
      } else {
        isMapperACommand = true;
        jobConf_.setMapperClass(PipeMapper.class);
        //jobConf_.setMapRunnerClass(PipeMapRunner.class);
        jobConf_.set("stream.map.streamprocessor",
                     URLEncoder.encode(mapCmd_, "UTF-8"));
      }
    }

    if (comCmd_ != null) {
      c = StreamUtil.goodClassOrNull(jobConf_, comCmd_, defaultPackage);
      if (c != null) {
        jobConf_.setCombinerClass(c);
      } else {
        jobConf_.setCombinerClass(PipeCombiner.class);
        jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(
            comCmd_, "UTF-8"));
      }
    }

    if (numReduceTasksSpec_ != null) {
      int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
      jobConf_.setNumReduceTasks(numReduceTasks);
    }

    boolean isReducerACommand = false;
    if (redCmd_ != null) {
      if (redCmd_.equals(REDUCE_NONE)) {
        jobConf_.setNumReduceTasks(0);
      }
      if (jobConf_.getNumReduceTasks() != 0) {
        if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
          //jobConf_.setReducerClass(ValueAggregatorReducer.class);
          //jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
          // TODO reducer lib
          throw new UnsupportedOperationException("'aggregate' reducer not supported yet");
        } else {
          c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
          if (c != null) {
            jobConf_.setReducerClass(c);
          } else {
            isReducerACommand = true;
            jobConf_.setReducerClass(PipeReducer.class);
            jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
                redCmd_, "UTF-8"));
          }
        }
      }
    }

    String
        mapOutputFieldSeparator =
        unescapeSeparator(jobConf_.get("stream.map.output.field.separator", "\t"));
    String
        reduceInputFieldSeparator =
        unescapeSeparator(jobConf_.get("stream.reduce.input.field.separator", "\t"));
    int numOfMapOutputKeyFields = jobConf_.getInt("stream.num.map.output.key.fields", 1);

    if (numOfMapOutputKeyFields > 1 && !mapOutputFieldSeparator.equals(reduceInputFieldSeparator)) {
      throw new IllegalArgumentException(
          "for multiple-fields key, stream.reduce.input.field.separator should be the same as stream.map.output.field.separator to avoid confusion");
    }

    Column[] mapOutputKeySchema = new Column[numOfMapOutputKeyFields];

    Map<Integer, KeyDescription> keyOptions =
        parseKeyOptions(jobConf_.get("stream.map.output.key.options", ""));

    for (int i = 0; i < mapOutputKeySchema.length; i++) {
      KeyDescription keyDesc = keyOptions.get(i + 1);
      OdpsType t = (keyDesc == null || !keyDesc.numeric) ? OdpsType.STRING : OdpsType.BIGINT;
      mapOutputKeySchema[i] = new Column("map_out_key" + i, t);
    }
    jobConf_.setMapOutputKeySchema(mapOutputKeySchema);

    if (!keyOptions.isEmpty()) {
      JobConf.SortOrder[] sortOrder = new JobConf.SortOrder[mapOutputKeySchema.length];
      for (int i = 0; i < mapOutputKeySchema.length; i++) {
        KeyDescription keyDesc = keyOptions.get(i + 1);
        sortOrder[i] =
            (keyDesc == null || !keyDesc.reverse) ? JobConf.SortOrder.ASC : JobConf.SortOrder.DESC;
      }
      jobConf_.setOutputKeySortOrder(sortOrder);
    }

    jobConf_.setMapOutputValueSchema(new Column[]{new Column("map_out_value", OdpsType.STRING)});

    // use setPartitionColumns for KeyFieldBasedPartitioner
    if (partitionerSpec_ != null) {
      if (partitionerSpec_.equals("KeyFieldBasedPartitioner")) {
        partitionerSpec_ = "com.aliyun.odps.mapred.lib.KeyFieldBasedPartitioner";
      }
      if (partitionerSpec_.equals("com.aliyun.odps.mapred.lib.KeyFieldBasedPartitioner")) {
        String
            mapOutputKeyFieldSeparator =
            unescapeSeparator(jobConf_.get("map.output.key.field.separator", "\t"));
        if (mapOutputFieldSeparator.equals(mapOutputKeyFieldSeparator)) {
          int numOfKeyFieldsForPartition = jobConf_.getInt("num.key.fields.for.partition", 1);
          if (numOfKeyFieldsForPartition > numOfMapOutputKeyFields) {
            throw new IllegalArgumentException(
                "num.key.fields.for.partition should not bigger than stream.num.map.output.key.fields");
          }
          if (numOfKeyFieldsForPartition < numOfMapOutputKeyFields) {
            String[] partitionColumns = new String[numOfKeyFieldsForPartition];
            for (int i = 0; i < numOfKeyFieldsForPartition; i++) {
              partitionColumns[i] = mapOutputKeySchema[i].getName();
            }
            jobConf_.setPartitionColumns(partitionColumns);
          }
        } else {
          // need to split the first field for partition, only for compatible with hadoop.
          // FIXME this partitioner would be implemented by the StreamingOperator at runtime...
          c = StreamUtil.goodClassOrNull(jobConf_, partitionerSpec_, defaultPackage);
          if (c != null) {
            jobConf_.setPartitionerClass(c);
          }
        }
      } else {
        throw new IllegalArgumentException(
            "User defined partitioner not supported for streaming job");
      }
    }

    Class mapOutputReaderClass = TextOutputReader.class;
    Class reduceOutputReaderClass = RecordOutputReader.class;
    if (jobConf_.getNumReduceTasks() > 0) {
      boolean hasKey = jobConf_.getInt("stream.num.reduce.output.key.fields", 0) > 0;
      reduceOutputReaderClass = hasKey ? TextOutputReader.class : RecordOutputReader.class;
    } else {
      boolean hasKey = jobConf_.getInt("stream.num.map.output.key.fields", 0) > 0;
      mapOutputReaderClass = hasKey ? TextOutputReader.class : RecordOutputReader.class;
    }
    jobConf_.setClass("stream.map.output.reader.class", mapOutputReaderClass, OutputReader.class);
    jobConf_
        .setClass("stream.reduce.output.reader.class", reduceOutputReaderClass, OutputReader.class);

    // XXX no-output allowed
    if (output_ != null) {
      OutputUtils.addTable(parseTableInfo(output_), jobConf_);
    }

    //if(mapDebugSpec_ != null){
    //	jobConf_.setMapDebugScript(mapDebugSpec_);
    //}
    //if(reduceDebugSpec_ != null){
    //	jobConf_.setReduceDebugScript(reduceDebugSpec_);
    //}
    // last, allow user to override anything
    // (although typically used with properties we didn't touch)

    // FIXME resources linkname

    if (verbose_) {
      listJobConfProperties();
    }
  }