LogicalOperatorTree genTree()

in odps-sdk-impl/odps-mapred-bridge/src/main/java/com/aliyun/odps/mapred/bridge/LOTGenerator.java [141:256]


  LogicalOperatorTree genTree() {

    resourceItems = buildResourceList();

    LogicalOperatorTree.Builder builder = LogicalOperatorTree.newBuilder();

    // prepare input 
    inputTableInfos = InputUtils.getTables(job);
    inputVolumeInfos = InputUtils.getVolumes(job);
    // FIXME multi-mapper
    Map<TableInfoKey, List<LinkedHashMap<String, String>>>
        inputTables =
        mergeInputTableInfos(inputTableInfos);

    // prepare output
    outputTableInfos = OutputUtils.getTables(job);
    outputVolumeInfos = OutputUtils.getVolumes(job);

    // FIXME multi-insert from m-r's mapper
    isNoOutput = outputTableInfos == null;
    isMultiInsert = !isNoOutput && outputTableInfos.length > 1;

    // streaming job has string output columns
    boolean isStreamingOutput = job.getNumReduceTasks() > 0 ?
                                isStreamingReduce :
                                isStreamingMap;

    List<OdpsType> outputColumnTypes = new ArrayList<OdpsType>();
    if (isMultiInsert) {
      // concat output columns for multi-insert
      for (TableInfo ti : outputTableInfos) {
        List<OdpsType> tbColumnTypes = new ArrayList<OdpsType>();
        for (Column col : job.getOutputSchema(ti.getLabel())) {
          tbColumnTypes.add(col.getType());
        }
        // check if the same columns already exists
        int idx = Collections.indexOfSubList(outputColumnTypes, tbColumnTypes);
        if (idx >= 0) {
          // merge columns for tableinfos with the same schema
          outputIndexes.put(ti.getLabel(), idx);
          continue;
        }
        idx = outputColumns.size();
        outputIndexes.put(ti.getLabel(), idx);
        for (Column col : job.getOutputSchema(ti.getLabel())) {
          String colName = "multiins" + idx + "_" + col.getName();
          if (isStreamingOutput) {
            outputColumns.add(new Column(colName, OdpsType.STRING));
            outputColumnTypes.add(OdpsType.STRING);
          } else {
            outputColumns.add(TypeUtils.createColumnWithNewName(colName, col));
            outputColumnTypes.add(col.getType());
          }
        }
      }
      outputColumns.add(new Column(MULTI_INSERT_SELECTOR, OdpsType.STRING));
    } else if (isNoOutput) {
      // FIXME currently UDTF need a output column
      outputColumns.add(new Column(NO_OUTPUT_DUMMY_COLUMN, OdpsType.STRING));
    } else {
      for (Column col : job.getOutputSchema(outputTableInfos[0].getLabel())) {
        if (isStreamingOutput) {
          outputColumns.add(new Column(col.getName(), OdpsType.STRING));
        } else {
          outputColumns.add(TypeUtils.cloneColumn(col));
        }
      }
    }

    // prepare intermediate key/value
    // FIXME types/signature
    // FIXME use col name or not?
    List<Column> mapOutColumns;
    List<Column> firstReduceInColumns = null;
    int innerOutputIndex = 0;
    if (hasReducer) {
      mapOutColumns = new ArrayList<Column>();
      firstReduceInColumns = new ArrayList<Column>();

      if (hasPartitioner) {
        mapOutColumns.add(new Column(PARTITION_ID, OdpsType.BIGINT));
      }

      Column[] keys = this.pipeMode ?
                      pipeline.getFirstNode().getOutputKeySchema() :
                      job.getMapOutputKeySchema() ;
      for (Column col : keys) {
        Column keyCol = TypeUtils.createColumnWithNewName(MAP_OUT_KEY_PREFIX + col.getName(), col);
        mapOutColumns.add(keyCol);
        firstReduceInColumns.add(keyCol);
      }
      Column[] values = this.pipeMode ?
                        pipeline.getFirstNode().getOutputValueSchema() :
                        job.getMapOutputValueSchema();
      for (Column col : values) {
        Column valCol = TypeUtils.createColumnWithNewName(MAP_OUT_VAL_PREFIX + col.getName(), col);
        mapOutColumns.add(valCol);
        firstReduceInColumns.add(valCol);
      }
    } else {
      mapOutColumns = outputColumns;
    }

    //XXX: lot not support multi inputs with inner output
    String mapperId = genMapBlock(builder, inputTables, mapOutColumns, innerOutputIndex,
        hasReducer && isInnerOutput && (inputTables.size() <= 1));

    if (hasReducer) {
      genReduceBlock(builder, firstReduceInColumns, mapperId);
    } else {
      // map only output
      handleOutput(builder, false, outputColumns, mapperId, isTableOverwrite, innerOutputIndex);
    }

    return builder.build();
  }