in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/LOTGenerator.java [141:256]
LogicalOperatorTree genTree() {
resourceItems = buildResourceList();
LogicalOperatorTree.Builder builder = LogicalOperatorTree.newBuilder();
// prepare input
inputTableInfos = InputUtils.getTables(job);
inputVolumeInfos = InputUtils.getVolumes(job);
// FIXME multi-mapper
Map<TableInfoKey, List<LinkedHashMap<String, String>>>
inputTables =
mergeInputTableInfos(inputTableInfos);
// prepare output
outputTableInfos = OutputUtils.getTables(job);
outputVolumeInfos = OutputUtils.getVolumes(job);
// FIXME multi-insert from m-r's mapper
isNoOutput = outputTableInfos == null;
isMultiInsert = !isNoOutput && outputTableInfos.length > 1;
// streaming job has string output columns
boolean isStreamingOutput = job.getNumReduceTasks() > 0 ?
isStreamingReduce :
isStreamingMap;
List<OdpsType> outputColumnTypes = new ArrayList<OdpsType>();
if (isMultiInsert) {
// concat output columns for multi-insert
for (TableInfo ti : outputTableInfos) {
List<OdpsType> tbColumnTypes = new ArrayList<OdpsType>();
for (Column col : job.getOutputSchema(ti.getLabel())) {
tbColumnTypes.add(col.getType());
}
// check if the same columns already exists
int idx = Collections.indexOfSubList(outputColumnTypes, tbColumnTypes);
if (idx >= 0) {
// merge columns for tableinfos with the same schema
outputIndexes.put(ti.getLabel(), idx);
continue;
}
idx = outputColumns.size();
outputIndexes.put(ti.getLabel(), idx);
for (Column col : job.getOutputSchema(ti.getLabel())) {
String colName = "multiins" + idx + "_" + col.getName();
if (isStreamingOutput) {
outputColumns.add(new Column(colName, OdpsType.STRING));
outputColumnTypes.add(OdpsType.STRING);
} else {
outputColumns.add(TypeUtils.createColumnWithNewName(colName, col));
outputColumnTypes.add(col.getType());
}
}
}
outputColumns.add(new Column(MULTI_INSERT_SELECTOR, OdpsType.STRING));
} else if (isNoOutput) {
// FIXME currently UDTF need a output column
outputColumns.add(new Column(NO_OUTPUT_DUMMY_COLUMN, OdpsType.STRING));
} else {
for (Column col : job.getOutputSchema(outputTableInfos[0].getLabel())) {
if (isStreamingOutput) {
outputColumns.add(new Column(col.getName(), OdpsType.STRING));
} else {
outputColumns.add(TypeUtils.cloneColumn(col));
}
}
}
// prepare intermediate key/value
// FIXME types/signature
// FIXME use col name or not?
List<Column> mapOutColumns;
List<Column> firstReduceInColumns = null;
int innerOutputIndex = 0;
if (hasReducer) {
mapOutColumns = new ArrayList<Column>();
firstReduceInColumns = new ArrayList<Column>();
if (hasPartitioner) {
mapOutColumns.add(new Column(PARTITION_ID, OdpsType.BIGINT));
}
Column[] keys = this.pipeMode ?
pipeline.getFirstNode().getOutputKeySchema() :
job.getMapOutputKeySchema() ;
for (Column col : keys) {
Column keyCol = TypeUtils.createColumnWithNewName(MAP_OUT_KEY_PREFIX + col.getName(), col);
mapOutColumns.add(keyCol);
firstReduceInColumns.add(keyCol);
}
Column[] values = this.pipeMode ?
pipeline.getFirstNode().getOutputValueSchema() :
job.getMapOutputValueSchema();
for (Column col : values) {
Column valCol = TypeUtils.createColumnWithNewName(MAP_OUT_VAL_PREFIX + col.getName(), col);
mapOutColumns.add(valCol);
firstReduceInColumns.add(valCol);
}
} else {
mapOutColumns = outputColumns;
}
//XXX: lot not support multi inputs with inner output
String mapperId = genMapBlock(builder, inputTables, mapOutColumns, innerOutputIndex,
hasReducer && isInnerOutput && (inputTables.size() <= 1));
if (hasReducer) {
genReduceBlock(builder, firstReduceInColumns, mapperId);
} else {
// map only output
handleOutput(builder, false, outputColumns, mapperId, isTableOverwrite, innerOutputIndex);
}
return builder.build();
}