public Iterator readResourceTable()

in odps-sdk-impl/odps-mapred-local/src/main/java/com/aliyun/odps/mapred/local/LocalTaskContext.java [339:432]


  public Iterator<Record> readResourceTable(String tbl) throws IOException {
    if (StringUtils.isEmpty(tbl)) {
      throw new IOException("Table resouce name is empty or null");
    }

    if (!jobDirecotry.hasResource(tbl)) {
      String project = SessionState.get().getOdps().getDefaultProject();
      try {
        WareHouse.getInstance().copyResource(project, tbl, jobDirecotry.getResourceDir(),
                                             WareHouse.getInstance().getLimitDownloadRecordCount(),
                                             WareHouse.getInstance().getInputColumnSeperator());
      } catch (OdpsException e) {
      }
    }

    File dir = new File(jobDirecotry.getResourceDir(), tbl);
    LOG.info("Reading resource table from " + dir);
    final List<File> datafiles = new ArrayList<File>();

    LocalRunUtils.listAllDataFiles(dir, datafiles);

    final TableMeta tableMeta = SchemaUtils.readSchema(dir);

    return new Iterator<Record>() {
      RecordReader reader;
      Record current;
      boolean fetched;

      @Override
      public boolean hasNext() {
        if (fetched) {
          return current != null;
        }
        // Fetch new one
        try {
          fetch();
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
        return current != null;

      }

      private void fetch() throws IOException {

        // first time
        if (reader == null) {
          if (datafiles.isEmpty()) {
            current = null;
            fetched = true;
            return;
          }

          File f = datafiles.remove(0);
          reader =
              new CSVRecordReader(new FileSplit(f, tableMeta.getCols(), 0, f.getTotalSpace()),
                  tableMeta, LocalJobRunner.EMPTY_COUNTER, LocalJobRunner.EMPTY_COUNTER, counters,
                  WareHouse.getInstance().getInputColumnSeperator());
          current = reader.read();
          fetched = true;
          return;
        }

        current = reader.read();
        if (current == null && !datafiles.isEmpty()) {
          File f = datafiles.remove(0);
          reader =
              new CSVRecordReader(new FileSplit(f, tableMeta.getCols(), 0, f.getTotalSpace()),
                  tableMeta, LocalJobRunner.EMPTY_COUNTER, LocalJobRunner.EMPTY_COUNTER, counters,
                  WareHouse.getInstance().getInputColumnSeperator());
          current = reader.read();
          fetched = true;
          return;
        }

        fetched = true;
      }

      @Override
      public Record next() {
        if (!hasNext()) {
          throw new NoSuchElementException();
        }
        fetched = false;
        return current.clone();
      }

      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }

    };
  }