in odps-sdk/odps-sdk-mapred/src/main/java/com/aliyun/odps/pipeline/Pipeline.java [1008:1093]
public static void toJobConf(JobConf conf, Pipeline pipeline) {
StringBuilder sb = new StringBuilder();
List<TransformNode> nodes = pipeline.getNodes();
for (int i = 0; i < nodes.size(); i++) {
TransformNode node = nodes.get(i);
sb.append(node.type);
sb.append(":");
sb.append(node.getTransformClass().getName());
if (i != nodes.size() - 1) {
sb.append(",");
}
if (node.getOutputKeySchema() != null) {
conf.set(PIPELINE + i + OUTPUT_KEY_SCHEMA,
SchemaUtils.toString(node.getOutputKeySchema()));
}
if (node.getOutputValueSchema() != null) {
conf.set(PIPELINE + i + OUTPUT_VALUE_SCHEMA,
SchemaUtils.toString(node.getOutputValueSchema()));
}
if (node.getOutputKeySortColumns() != null) {
conf.set(PIPELINE + i + OUTPUT_KEY_SORT_COLUMNS,
StringUtils.join(node.getOutputKeySortColumns(), ","));
}
conf.set(PIPELINE + i + OUTPUT_KEY_SORT_ORDER,
StringUtils.join(node.getOutputKeySortOrder(), ","));
if (node.getPartitionColumns() != null) {
conf.set(PIPELINE + i + PARTITION_COLUMNS,
StringUtils.join(node.getPartitionColumns(), ","));
}
if (node.getPartitionerClass() != null) {
conf.set(PIPELINE + i + PARTITION_CLASS,
node.getPartitionerClass().getName());
}
if (node.getOutputGroupingColumns() != null) {
conf.set(PIPELINE + i + OUTPUT_GROUP_COLUMNS,
StringUtils.join(node.getOutputGroupingColumns(), ","));
}
if (node.getOutputKeyComparatorClass() != null) {
conf.set(PIPELINE + i + OUTPUT_KEY_COMPARATOR_CLASS,
node.getOutputKeyComparatorClass().getName());
}
if (node.getOutputKeyGroupingComparatorClass() != null) {
conf.set(PIPELINE + i + OUTPUT_KEY_GROUPING_COMPARATOR_CLASS,
node.getOutputKeyGroupingComparatorClass().getName());
}
if (node.getNumTasks() >= 0) {
if (i == 0) {
conf.setInt("odps.stage.mapper.num", node.getNumTasks());
} else {
conf.setInt("odps.stage.reducer." + i + ".num", node.getNumTasks());
}
conf.setBoolean("odps.sql.jobconf.odps2", true);
conf.setBoolean("odps.optimizer.cbo.enable.dynamic.parallelism", false);
}
if (node.getMemoryForTask() >= 0) {
if (i == 0) {
conf.setInt("odps.stage.mapper.mem", node.getMemoryForTask());
} else {
conf.setInt("odps.stage.reducer." + i + ".mem", node.getMemoryForTask());
}
conf.setBoolean("odps.sql.jobconf.odps2", true);
}
if (node.getMemoryForJVM() >= 0) {
if (i == 0) {
conf.setInt("odps.stage.mapper.jvm.mem", node.getMemoryForJVM());
} else {
conf.setInt("odps.stage.reducer." + i + ".jvm.mem", node.getMemoryForJVM());
}
conf.setBoolean("odps.sql.jobconf.odps2", true);
}
}
conf.set(PIPELINE_LIST, sb.toString());
}