private void processInput()

in odps-sdk-impl/odps-mapred-local/src/main/java/com/aliyun/odps/mapred/LocalJobRunner.java [404:496]


  private void processInput(TableInfo tableInfo) throws IOException, OdpsException {
    if (tableInfo == null || StringUtils.isBlank(tableInfo.getTableName())) {
      throw new RuntimeException("Invalid TableInfo: " + tableInfo);
    }

    if (StringUtils.isEmpty(tableInfo.getProjectName())) {
      tableInfo.setProjectName(wareHouse.getOdps().getDefaultProject());
    }

    String[] readCols = tableInfo.getCols();

    // 获取指定的MR需要处理的分区
    PartitionSpec expectParts = tableInfo.getPartitionSpec();

    // 该部分逻辑只负责下载Table Scheme和数据,后续处理和本地一样
    if (!wareHouse
        .existsPartition(tableInfo.getProjectName(), tableInfo.getTableName(), expectParts)
        ||wareHouse.getDownloadMode()==DownloadMode.ALWAYS) {

      DownloadUtils
          .downloadTableSchemeAndData(odps, tableInfo, wareHouse.getLimitDownloadRecordCount(),
                                      wareHouse.getInputColumnSeperator());

      if (!wareHouse
          .existsPartition(tableInfo.getProjectName(), tableInfo.getTableName(), expectParts)) {
        throw new RuntimeException(LocalRunUtils.getDownloadErrorMsg(tableInfo.toString()));
      }
    }

    // ////从warehouse _scheme_读出的信息,相当于服务器端数据表的信息////
    TableMeta
        whTblMeta =
        wareHouse.getTableMeta(tableInfo.getProjectName(), tableInfo.getTableName());
    Column[] whReadFields = LocalRunUtils.getInputTableFields(whTblMeta, readCols);
    List<PartitionSpec>
        whParts =
        wareHouse.getPartitions(tableInfo.getProjectName(), tableInfo.getTableName());

    if (whParts.size() > 0) {
      // partitioned table
      for (PartitionSpec partSpec : whParts) {
        // 过滤非指定的分区
        if (!PartitionUtils.match(expectParts, partSpec)) {
          continue;
        }
        File whSrcDir = wareHouse.getPartitionDir(whTblMeta.getProjName(),
                                                  whTblMeta.getTableName(), partSpec);
        // add input split only when src dir has data file
        if (LocalRunUtils.listDataFiles(whSrcDir).size() > 0) {

          // 保持本地临时目录结构与warehouse中的目录结构一致
          File tempDataDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
              whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec));
          File tempSchemeDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
              whTblMeta.getProjName(), whTblMeta.getTableName(), null));
          wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec,
                              readCols, tempSchemeDir, wareHouse.getLimitDownloadRecordCount(),
                              wareHouse.getInputColumnSeperator());
          for (File file : LocalRunUtils.listDataFiles(tempDataDir)) {
            FileSplit split = new FileSplit(file, whReadFields, 0L, file.length());
            splitToTableInfo.put(split, tableInfo);
            inputs.add(split);
          }
        }
      }
    } else {
      // not partitioned table
      if (tableInfo.getPartSpec() != null && tableInfo.getPartSpec().size() > 0) {
        throw new IOException(
            MRExceptionCode.ODPS_0720121 + "table " + tableInfo.getProjectName() + "." + tableInfo
                .getTableName()
            + " is not partitioned table");
      }

      File whSrcDir = wareHouse.getTableDir(whTblMeta.getProjName(), whTblMeta.getTableName());
      if (LocalRunUtils.listDataFiles(whSrcDir).size() > 0) {

        // 保持本地临时目录结构与warehouse中的目录结构一致
        File tempDataDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
            whTblMeta.getProjName(), whTblMeta.getTableName(), null));
        File tempSchemeDir = tempDataDir;
        wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(), null, readCols,
                            tempSchemeDir, wareHouse.getLimitDownloadRecordCount(),
                            wareHouse.getInputColumnSeperator());
        for (File file : LocalRunUtils.listDataFiles(tempDataDir)) {
          FileSplit split = new FileSplit(file, whReadFields, 0L, file.length());
          splitToTableInfo.put(split, tableInfo);
          inputs.add(split);
        }
      }
    }

  }