public static void toJobConf()

in odps-sdk/odps-sdk-mapred/src/main/java/com/aliyun/odps/pipeline/Pipeline.java [1008:1093]


  public static void toJobConf(JobConf conf, Pipeline pipeline) {
    StringBuilder sb = new StringBuilder();
    List<TransformNode> nodes = pipeline.getNodes();
    for (int i = 0; i < nodes.size(); i++) {
      TransformNode node = nodes.get(i);
      sb.append(node.type);
      sb.append(":");
      sb.append(node.getTransformClass().getName());
      if (i != nodes.size() - 1) {
        sb.append(",");
      }

      if (node.getOutputKeySchema() != null) {
        conf.set(PIPELINE + i + OUTPUT_KEY_SCHEMA,
                 SchemaUtils.toString(node.getOutputKeySchema()));
      }

      if (node.getOutputValueSchema() != null) {
        conf.set(PIPELINE + i + OUTPUT_VALUE_SCHEMA,
                 SchemaUtils.toString(node.getOutputValueSchema()));
      }

      if (node.getOutputKeySortColumns() != null) {
        conf.set(PIPELINE + i + OUTPUT_KEY_SORT_COLUMNS,
                 StringUtils.join(node.getOutputKeySortColumns(), ","));
      }

      conf.set(PIPELINE + i + OUTPUT_KEY_SORT_ORDER,
          StringUtils.join(node.getOutputKeySortOrder(), ","));

      if (node.getPartitionColumns() != null) {
        conf.set(PIPELINE + i + PARTITION_COLUMNS,
                 StringUtils.join(node.getPartitionColumns(), ","));
      }

      if (node.getPartitionerClass() != null) {
        conf.set(PIPELINE + i + PARTITION_CLASS,
                 node.getPartitionerClass().getName());
      }

      if (node.getOutputGroupingColumns() != null) {
        conf.set(PIPELINE + i + OUTPUT_GROUP_COLUMNS,
                 StringUtils.join(node.getOutputGroupingColumns(), ","));
      }

      if (node.getOutputKeyComparatorClass() != null) {
        conf.set(PIPELINE + i + OUTPUT_KEY_COMPARATOR_CLASS, 
            node.getOutputKeyComparatorClass().getName());
      }

      if (node.getOutputKeyGroupingComparatorClass() != null) {
        conf.set(PIPELINE + i + OUTPUT_KEY_GROUPING_COMPARATOR_CLASS, 
            node.getOutputKeyGroupingComparatorClass().getName());
      }

      if (node.getNumTasks() >= 0) {
        if (i == 0) {
          conf.setInt("odps.stage.mapper.num", node.getNumTasks());
        } else {
          conf.setInt("odps.stage.reducer." + i + ".num", node.getNumTasks());
        }
        conf.setBoolean("odps.sql.jobconf.odps2", true);
        conf.setBoolean("odps.optimizer.cbo.enable.dynamic.parallelism", false);
      }

      if (node.getMemoryForTask() >= 0) {
        if (i == 0) {
          conf.setInt("odps.stage.mapper.mem", node.getMemoryForTask());
        } else {
          conf.setInt("odps.stage.reducer." + i + ".mem", node.getMemoryForTask());
        }
        conf.setBoolean("odps.sql.jobconf.odps2", true);
      }

      if (node.getMemoryForJVM() >= 0) {
        if (i == 0) {
          conf.setInt("odps.stage.mapper.jvm.mem", node.getMemoryForJVM());
        } else {
          conf.setInt("odps.stage.reducer." + i + ".jvm.mem", node.getMemoryForJVM());
        }
        conf.setBoolean("odps.sql.jobconf.odps2", true);
      }
    }

    conf.set(PIPELINE_LIST, sb.toString());
  }