private void processInput()

in odps-sdk-impl/odps-graph-local/src/main/java/com/aliyun/odps/graph/local/LocalGraphJobRunner.java [160:252]


  private void processInput(TableInfo tableInfo) throws IOException, OdpsException {
    LOG.info("Processing input: " + tableInfo);

    String projName = tableInfo.getProjectName();
    if (projName == null) {
      projName = SessionState.get().getOdps().getDefaultProject();
    }
    String tblName = tableInfo.getTableName();
    String[] readCols = tableInfo.getCols();

    // 获取指定的MR需要处理的分区
    LinkedHashMap<String, String> expectPartsHashMap = tableInfo.getPartSpec();
    PartitionSpec expectParts = null;
    if (expectPartsHashMap != null && expectPartsHashMap.size() > 0) {
      StringBuffer sb = new StringBuffer();
      for (String key : expectPartsHashMap.keySet()) {
        if (sb.length() > 0) {
          sb.append(",");
        }
        sb.append(key + "=" + expectPartsHashMap.get(key));
      }
      expectParts = new PartitionSpec(sb.toString());
    }

    // 该部分逻辑只负责下载Table Scheme和数据,后续处理和本地一样
    if (!wareHouse.existsTable(projName, tblName)
        || wareHouse.getDownloadMode() == DownloadMode.ALWAYS) {

      DownloadUtils
          .downloadTableSchemeAndData(odps, tableInfo, wareHouse.getLimitDownloadRecordCount(),
                                      wareHouse.getInputColumnSeperator());

      if (!wareHouse.existsTable(projName, tblName)) {
        throw new OdpsException("download table from remote host failure");
      }
    }

    // ////从warehouse _scheme_读出的信息,相当与服务器端数据表的信息////
    TableMeta whTblMeta = wareHouse.getTableMeta(projName, tblName);
    Column[] whReadFields = LocalRunUtils.getInputTableFields(whTblMeta,
                                                              readCols);
    List<PartitionSpec> whParts = wareHouse.getPartitions(projName, tblName);
    // //////////////////////

    if (whParts.size() > 0) {
      // partitioned table
      for (PartitionSpec partSpec : whParts) {
        // 过滤非指定的分区
        if (!match(expectParts, partSpec)) {
          continue;
        }
        File whSrcDir = wareHouse.getPartitionDir(whTblMeta.getProjName(),
                                                  whTblMeta.getTableName(), partSpec);
        // add input split only when src dir has data file
        if (LocalRunUtils.listDataFiles(whSrcDir).size() > 0) {

          // 保持本地临时目录结构与warehouse中的目录结构一致
          File tempDataDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
              whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec));
          File tempSchemeDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
              whTblMeta.getProjName(), whTblMeta.getTableName(), null));
          wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(), partSpec,
                              readCols, tempSchemeDir, wareHouse.getLimitDownloadRecordCount(),
                              wareHouse.getInputColumnSeperator());
          for (File file : LocalRunUtils.listDataFiles(tempDataDir)) {
            inputs.add(new InputSplit(file, whReadFields, 0L, file.length(), tableInfo));
          }
        }
      }
    } else {
      // not partitioned table
      if (tableInfo.getPartSpec() != null && tableInfo.getPartSpec().size() > 0) {
        throw new IOException(ExceptionCode.ODPS_0720121 + "table "
                              + projName + "." + tblName + " is not partitioned table");
      }

      File whSrcDir = wareHouse.getTableDir(whTblMeta.getProjName(),
                                            whTblMeta.getTableName());
      if (LocalRunUtils.listDataFiles(whSrcDir).size() > 0) {
        // 保持本地临时目录结构与warehouse中的目录结构一致
        File tempDataDir = jobDirecotry.getInputDir(wareHouse.getRelativePath(
            whTblMeta.getProjName(), whTblMeta.getTableName(), null));
        File tempSchemeDir = tempDataDir;
        wareHouse.copyTable(whTblMeta.getProjName(), whTblMeta.getTableName(),null, readCols,
                            tempSchemeDir, wareHouse.getLimitDownloadRecordCount(),
                            wareHouse.getInputColumnSeperator());
        for (File file : LocalRunUtils.listDataFiles(tempDataDir)) {
          inputs.add(new InputSplit(file, whReadFields, 0L, file.length(), tableInfo));
        }
      }
    }

  }