in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/streaming/StreamJob.java [590:789]
protected void setJobConf() throws IOException {
// general MapRed job properties
jobConf_ = new JobConf(config_);
// All streaming jobs get the task timeout value
// from the configuration settings.
for (int i = 0; i < inputSpecs_.size(); i++) {
InputUtils.addTable(inputSpecs_.get(i), jobConf_);
}
String defaultPackage = this.getClass().getPackage().getName();
if (ioSpec_ != null) {
jobConf_.set("stream.map.input", ioSpec_);
jobConf_.set("stream.map.output", ioSpec_);
jobConf_.set("stream.reduce.input", ioSpec_);
jobConf_.set("stream.reduce.output", ioSpec_);
}
//Class<? extends IdentifierResolver> idResolverClass =
// jobConf_.getClass("stream.io.identifier.resolver.class",
// IdentifierResolver.class, IdentifierResolver.class);
//IdentifierResolver idResolver = ReflectionUtils.newInstance(idResolverClass, jobConf_);
//idResolver.resolve(jobConf_.get("stream.map.input", IdentifierResolver.TEXT_ID));
//jobConf_.setClass("stream.map.input.writer.class",
// idResolver.getInputWriterClass(), InputWriter.class);
jobConf_.setClass("stream.map.input.writer.class", RecordInputWriter.class, InputWriter.class);
//idResolver.resolve(jobConf_.get("stream.reduce.input", IdentifierResolver.TEXT_ID));
//jobConf_.setClass("stream.reduce.input.writer.class",
// idResolver.getInputWriterClass(), InputWriter.class);
jobConf_.setClass("stream.reduce.input.writer.class", TextInputWriter.class, InputWriter.class);
jobConf_.set("stream.addenvironment", addTaskEnvironment_);
boolean isMapperACommand = false;
Class c = null;
if (mapCmd_ != null) {
c = StreamUtil.goodClassOrNull(jobConf_, mapCmd_, defaultPackage);
if (c != null) {
jobConf_.setMapperClass(c);
} else {
isMapperACommand = true;
jobConf_.setMapperClass(PipeMapper.class);
//jobConf_.setMapRunnerClass(PipeMapRunner.class);
jobConf_.set("stream.map.streamprocessor",
URLEncoder.encode(mapCmd_, "UTF-8"));
}
}
if (comCmd_ != null) {
c = StreamUtil.goodClassOrNull(jobConf_, comCmd_, defaultPackage);
if (c != null) {
jobConf_.setCombinerClass(c);
} else {
jobConf_.setCombinerClass(PipeCombiner.class);
jobConf_.set("stream.combine.streamprocessor", URLEncoder.encode(
comCmd_, "UTF-8"));
}
}
if (numReduceTasksSpec_ != null) {
int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
jobConf_.setNumReduceTasks(numReduceTasks);
}
boolean isReducerACommand = false;
if (redCmd_ != null) {
if (redCmd_.equals(REDUCE_NONE)) {
jobConf_.setNumReduceTasks(0);
}
if (jobConf_.getNumReduceTasks() != 0) {
if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
//jobConf_.setReducerClass(ValueAggregatorReducer.class);
//jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
// TODO reducer lib
throw new UnsupportedOperationException("'aggregate' reducer not supported yet");
} else {
c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
if (c != null) {
jobConf_.setReducerClass(c);
} else {
isReducerACommand = true;
jobConf_.setReducerClass(PipeReducer.class);
jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
redCmd_, "UTF-8"));
}
}
}
}
String
mapOutputFieldSeparator =
unescapeSeparator(jobConf_.get("stream.map.output.field.separator", "\t"));
String
reduceInputFieldSeparator =
unescapeSeparator(jobConf_.get("stream.reduce.input.field.separator", "\t"));
int numOfMapOutputKeyFields = jobConf_.getInt("stream.num.map.output.key.fields", 1);
if (numOfMapOutputKeyFields > 1 && !mapOutputFieldSeparator.equals(reduceInputFieldSeparator)) {
throw new IllegalArgumentException(
"for multiple-fields key, stream.reduce.input.field.separator should be the same as stream.map.output.field.separator to avoid confusion");
}
Column[] mapOutputKeySchema = new Column[numOfMapOutputKeyFields];
Map<Integer, KeyDescription> keyOptions =
parseKeyOptions(jobConf_.get("stream.map.output.key.options", ""));
for (int i = 0; i < mapOutputKeySchema.length; i++) {
KeyDescription keyDesc = keyOptions.get(i + 1);
OdpsType t = (keyDesc == null || !keyDesc.numeric) ? OdpsType.STRING : OdpsType.BIGINT;
mapOutputKeySchema[i] = new Column("map_out_key" + i, t);
}
jobConf_.setMapOutputKeySchema(mapOutputKeySchema);
if (!keyOptions.isEmpty()) {
JobConf.SortOrder[] sortOrder = new JobConf.SortOrder[mapOutputKeySchema.length];
for (int i = 0; i < mapOutputKeySchema.length; i++) {
KeyDescription keyDesc = keyOptions.get(i + 1);
sortOrder[i] =
(keyDesc == null || !keyDesc.reverse) ? JobConf.SortOrder.ASC : JobConf.SortOrder.DESC;
}
jobConf_.setOutputKeySortOrder(sortOrder);
}
jobConf_.setMapOutputValueSchema(new Column[]{new Column("map_out_value", OdpsType.STRING)});
// use setPartitionColumns for KeyFieldBasedPartitioner
if (partitionerSpec_ != null) {
if (partitionerSpec_.equals("KeyFieldBasedPartitioner")) {
partitionerSpec_ = "com.aliyun.odps.mapred.lib.KeyFieldBasedPartitioner";
}
if (partitionerSpec_.equals("com.aliyun.odps.mapred.lib.KeyFieldBasedPartitioner")) {
String
mapOutputKeyFieldSeparator =
unescapeSeparator(jobConf_.get("map.output.key.field.separator", "\t"));
if (mapOutputFieldSeparator.equals(mapOutputKeyFieldSeparator)) {
int numOfKeyFieldsForPartition = jobConf_.getInt("num.key.fields.for.partition", 1);
if (numOfKeyFieldsForPartition > numOfMapOutputKeyFields) {
throw new IllegalArgumentException(
"num.key.fields.for.partition should not bigger than stream.num.map.output.key.fields");
}
if (numOfKeyFieldsForPartition < numOfMapOutputKeyFields) {
String[] partitionColumns = new String[numOfKeyFieldsForPartition];
for (int i = 0; i < numOfKeyFieldsForPartition; i++) {
partitionColumns[i] = mapOutputKeySchema[i].getName();
}
jobConf_.setPartitionColumns(partitionColumns);
}
} else {
// need to split the first field for partition, only for compatible with hadoop.
// FIXME this partitioner would be implemented by the StreamingOperator at runtime...
c = StreamUtil.goodClassOrNull(jobConf_, partitionerSpec_, defaultPackage);
if (c != null) {
jobConf_.setPartitionerClass(c);
}
}
} else {
throw new IllegalArgumentException(
"User defined partitioner not supported for streaming job");
}
}
Class mapOutputReaderClass = TextOutputReader.class;
Class reduceOutputReaderClass = RecordOutputReader.class;
if (jobConf_.getNumReduceTasks() > 0) {
boolean hasKey = jobConf_.getInt("stream.num.reduce.output.key.fields", 0) > 0;
reduceOutputReaderClass = hasKey ? TextOutputReader.class : RecordOutputReader.class;
} else {
boolean hasKey = jobConf_.getInt("stream.num.map.output.key.fields", 0) > 0;
mapOutputReaderClass = hasKey ? TextOutputReader.class : RecordOutputReader.class;
}
jobConf_.setClass("stream.map.output.reader.class", mapOutputReaderClass, OutputReader.class);
jobConf_
.setClass("stream.reduce.output.reader.class", reduceOutputReaderClass, OutputReader.class);
// XXX no-output allowed
if (output_ != null) {
OutputUtils.addTable(parseTableInfo(output_), jobConf_);
}
//if(mapDebugSpec_ != null){
// jobConf_.setMapDebugScript(mapDebugSpec_);
//}
//if(reduceDebugSpec_ != null){
// jobConf_.setReduceDebugScript(reduceDebugSpec_);
//}
// last, allow user to override anything
// (although typically used with properties we didn't touch)
// FIXME resources linkname
if (verbose_) {
listJobConfProperties();
}
}