in odps-sdk/odps-sdk-mapred/src/main/java/com/aliyun/odps/pipeline/Pipeline.java [1102:1238]
public static Pipeline fromJobConf(JobConf conf) {
String pipes = conf.get(PIPELINE_LIST);
if (pipes == null) {
return null;
}
Builder builder = builder();
String[] pipelist = pipes.split(",");
for (int i = 0; i < pipelist.length; i++) {
String pipe = pipelist[i];
String[] parts = pipe.split(":");
// set class name
try {
Class<?> cls = conf.getClassByName(parts[1]);
if (cls != null) {
if (parts[0].equals("map")) {
if (!Mapper.class.isAssignableFrom(cls)) {
throw new RuntimeException(cls + " not Mapper");
} else {
builder.addMapper(cls.asSubclass(Mapper.class));
}
} else if (parts[0].equals("reduce")) {
if (!Reducer.class.isAssignableFrom(cls)) {
throw new RuntimeException(cls + " not Reducer");
} else {
builder.addReducer(cls.asSubclass(Reducer.class));
}
}
} else {
throw new RuntimeException("Class " + parts[1] + " not found");
}
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class " + parts[1] + " not found");
}
// other properties
String keySchema = conf.get(PIPELINE + i + OUTPUT_KEY_SCHEMA);
if (keySchema != null) {
builder.setOutputKeySchema(SchemaUtils.fromString(keySchema));
}
String valueSchema = conf.get(PIPELINE + i + OUTPUT_VALUE_SCHEMA);
if (valueSchema != null) {
builder.setOutputValueSchema(SchemaUtils.fromString(valueSchema));
}
String sortCols = conf.get(PIPELINE + i + OUTPUT_KEY_SORT_COLUMNS);
if (sortCols != null) {
builder.setOutputKeySortColumns(sortCols.split(","));
}
String joined = conf.get(PIPELINE + i + OUTPUT_KEY_SORT_ORDER);
SortOrder[] order;
if (joined != null && !joined.isEmpty()) {
String[] orders = joined.split(",");
order = new SortOrder[orders.length];
for (int j = 0; j < order.length; j++) {
order[j] = SortOrder.valueOf(orders[j]);
}
builder.setOutputKeySortOrder(order);
}
String partCols = conf.get(PIPELINE + i + PARTITION_COLUMNS);
if (partCols != null && !partCols.isEmpty()) {
builder.setPartitionColumns(partCols.split(","));
}
String partClass = conf.get(PIPELINE + i + PARTITION_CLASS);
if (partClass != null && !partClass.isEmpty()) {
try {
Class<?> cls = conf.getClassByName(partClass);
if (cls != null) {
if (!Partitioner.class.isAssignableFrom(cls)) {
throw new RuntimeException(cls + " not Partitioner");
} else {
builder.setPartitionerClass(cls.asSubclass(Partitioner.class));
}
} else {
throw new RuntimeException("Class " + partClass + " not found");
}
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class " + partClass + " not found");
}
}
String groupCols = conf.get(PIPELINE + i + OUTPUT_GROUP_COLUMNS);
if (groupCols != null && !groupCols.isEmpty()) {
builder.setOutputGroupingColumns(groupCols.split(","));
}
String keyCmpClass = conf.get(PIPELINE + i + OUTPUT_KEY_COMPARATOR_CLASS);
if (keyCmpClass != null && !keyCmpClass.isEmpty()) {
try {
Class<?> cls = conf.getClassByName(keyCmpClass);
if (cls != null) {
if (!RecordComparator.class.isAssignableFrom(cls)) {
throw new RuntimeException(cls + " not RecordComparator");
} else {
builder.setOutputKeyComparatorClass(cls.asSubclass(RecordComparator.class));
}
} else {
throw new RuntimeException("Class " + keyCmpClass + " not found");
}
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class " + keyCmpClass + " not found");
}
}
String keyGrpCmpClass = conf.get(PIPELINE + i + OUTPUT_KEY_GROUPING_COMPARATOR_CLASS);
if (keyGrpCmpClass != null && !keyGrpCmpClass.isEmpty()) {
try {
Class<?> cls = conf.getClassByName(keyGrpCmpClass);
if (cls != null) {
if (!RecordComparator.class.isAssignableFrom(cls)) {
throw new RuntimeException(cls + " not RecordComparator");
} else {
builder.setOutputKeyGroupingComparatorClass(cls.asSubclass(RecordComparator.class));
}
} else {
throw new RuntimeException("Class " + keyGrpCmpClass + " not found");
}
} catch (ClassNotFoundException e) {
throw new RuntimeException("Class " + keyGrpCmpClass + " not found");
}
}
int numTasks = 1;
if (i == 0) {
numTasks = conf.getInt("odps.stage.mapper.num", 1);
} else {
numTasks = conf.getInt("odps.stage.reducer." + i + ".num", conf.getInt("odps.stage.reducer.num", 1));
}
builder.setNumTasks(numTasks);
}
return builder.createPipeline();
}