public static Pipeline fromJobConf()

in odps-sdk/odps-sdk-mapred/src/main/java/com/aliyun/odps/pipeline/Pipeline.java [1102:1238]


  public static Pipeline fromJobConf(JobConf conf) {
    String pipes = conf.get(PIPELINE_LIST);
    if (pipes == null) {
      return null;
    }

    Builder builder = builder();
    String[] pipelist = pipes.split(",");
    for (int i = 0; i < pipelist.length; i++) {
      String pipe = pipelist[i];
      String[] parts = pipe.split(":");

      // set class name
      try {
        Class<?> cls = conf.getClassByName(parts[1]);
        if (cls != null) {
          if (parts[0].equals("map")) {
            if (!Mapper.class.isAssignableFrom(cls)) {
              throw new RuntimeException(cls + " not Mapper");
            } else {
              builder.addMapper(cls.asSubclass(Mapper.class));
            }
          } else if (parts[0].equals("reduce")) {
            if (!Reducer.class.isAssignableFrom(cls)) {
              throw new RuntimeException(cls + " not Reducer");
            } else {
              builder.addReducer(cls.asSubclass(Reducer.class));
            }
          }
        } else {
          throw new RuntimeException("Class " + parts[1] + " not found");
        }
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("Class " + parts[1] + " not found");
      }

      // other properties
      String keySchema = conf.get(PIPELINE + i + OUTPUT_KEY_SCHEMA);
      if (keySchema != null) {
        builder.setOutputKeySchema(SchemaUtils.fromString(keySchema));
      }

      String valueSchema = conf.get(PIPELINE + i + OUTPUT_VALUE_SCHEMA);
      if (valueSchema != null) {
        builder.setOutputValueSchema(SchemaUtils.fromString(valueSchema));
      }

      String sortCols = conf.get(PIPELINE + i + OUTPUT_KEY_SORT_COLUMNS);
      if (sortCols != null) {
        builder.setOutputKeySortColumns(sortCols.split(","));
      }

      String joined = conf.get(PIPELINE + i + OUTPUT_KEY_SORT_ORDER);
      SortOrder[] order;
      if (joined != null && !joined.isEmpty()) {
        String[] orders = joined.split(",");
        order = new SortOrder[orders.length];
        for (int j = 0; j < order.length; j++) {
          order[j] = SortOrder.valueOf(orders[j]);
        }
        builder.setOutputKeySortOrder(order);
      }

      String partCols = conf.get(PIPELINE + i + PARTITION_COLUMNS);
      if (partCols != null && !partCols.isEmpty()) {
        builder.setPartitionColumns(partCols.split(","));
      }

      String partClass = conf.get(PIPELINE + i + PARTITION_CLASS);
      if (partClass != null && !partClass.isEmpty()) {
        try {
          Class<?> cls = conf.getClassByName(partClass);
          if (cls != null) {
            if (!Partitioner.class.isAssignableFrom(cls)) {
              throw new RuntimeException(cls + " not Partitioner");
            } else {
              builder.setPartitionerClass(cls.asSubclass(Partitioner.class));
            }
          } else {
            throw new RuntimeException("Class " + partClass + " not found");
          }
        } catch (ClassNotFoundException e) {
          throw new RuntimeException("Class " + partClass + " not found");
        }
      }

      String groupCols = conf.get(PIPELINE + i + OUTPUT_GROUP_COLUMNS);
      if (groupCols != null && !groupCols.isEmpty()) {
        builder.setOutputGroupingColumns(groupCols.split(","));
      }

      String keyCmpClass = conf.get(PIPELINE + i + OUTPUT_KEY_COMPARATOR_CLASS);
      if (keyCmpClass != null && !keyCmpClass.isEmpty()) {
        try {
          Class<?> cls = conf.getClassByName(keyCmpClass);
          if (cls != null) {
            if (!RecordComparator.class.isAssignableFrom(cls)) {
              throw new RuntimeException(cls + " not RecordComparator");
            } else {
              builder.setOutputKeyComparatorClass(cls.asSubclass(RecordComparator.class));
            }
          } else {
            throw new RuntimeException("Class " + keyCmpClass + " not found");
          }
        } catch (ClassNotFoundException e) {
          throw new RuntimeException("Class " + keyCmpClass + " not found");
        }
      }

      String keyGrpCmpClass = conf.get(PIPELINE + i + OUTPUT_KEY_GROUPING_COMPARATOR_CLASS);
      if (keyGrpCmpClass != null && !keyGrpCmpClass.isEmpty()) {
        try {
          Class<?> cls = conf.getClassByName(keyGrpCmpClass);
          if (cls != null) {
            if (!RecordComparator.class.isAssignableFrom(cls)) {
              throw new RuntimeException(cls + " not RecordComparator");
            } else {
              builder.setOutputKeyGroupingComparatorClass(cls.asSubclass(RecordComparator.class));
            }
          } else {
            throw new RuntimeException("Class " + keyGrpCmpClass + " not found");
          }
        } catch (ClassNotFoundException e) {
          throw new RuntimeException("Class " + keyGrpCmpClass + " not found");
        }
      }

      int numTasks = 1;
      if (i == 0) {
        numTasks = conf.getInt("odps.stage.mapper.num", 1);
      } else {
        numTasks = conf.getInt("odps.stage.reducer." + i + ".num", conf.getInt("odps.stage.reducer.num", 1));
      }
      builder.setNumTasks(numTasks);
    }
    return builder.createPipeline();
  }