public Iterator readResourceTable()

in odps-sdk-impl/odps-common-local/src/main/java/com/aliyun/odps/local/common/WareHouse.java [1046:1157]


  public Iterator<Object[]> readResourceTable(String project, String resource,
                                              final char inputColumnSeperator)
      throws IOException, OdpsException {
    if (!existsResource(project, resource)) {
      DownloadUtils.downloadResource(WareHouse.getInstance().getOdps(), getOdps()
          .getDefaultProject(), resource, getLimitDownloadRecordCount(), inputColumnSeperator);
    }

    File tableResourceDir = getReourceFile(project, resource);
    if (!tableResourceDir.isDirectory()) {
      throw new OdpsException("Resource " + project + "." + resource
                              + " is not a valid file Resource, because it is not a direcotry");
    }

    // LOG.info("Reading resource table from " +
    // tableResourceDir.getAbsolutePath());
    TableInfo tableInfo = getReferencedTable(project, resource);
    PartitionSpec partitionSpec = PartitionUtils.convert(tableInfo.getPartSpec());

    final List<File> datafiles = getDataFiles(project, tableInfo.getTableName(), partitionSpec,
                                              inputColumnSeperator);

    final Column[] schema = SchemaUtils.readSchema(getTableDir(project, tableInfo.getTableName()))
        .getCols();

    return new Iterator<Object[]>() {
      CsvReader reader;
      Object[] current;
      boolean fetched;

      @Override
      public boolean hasNext() {
        if (fetched) {
          return current != null;
        }
        // Fetch new one
        try {
          fetch();
        } catch (IOException e) {
          throw new RuntimeException(e);
        }
        return current != null;

      }

      private void fetch() throws IOException {

        // first time
        if (reader == null) {
          if (datafiles.isEmpty()) {
            current = null;
            fetched = true;
            return;
          }

          File f = datafiles.remove(0);
          reader = DownloadUtils.newCsvReader(f.getAbsolutePath(), inputColumnSeperator, encoding);
          reader.setSafetySwitch(false);
          current = read();
          fetched = true;
          return;
        }

        current = read();

        if (current == null && !datafiles.isEmpty()) {
          File f = datafiles.remove(0);
          reader = DownloadUtils.newCsvReader(f.getAbsolutePath(), inputColumnSeperator, encoding);
          reader.setSafetySwitch(false);

          current = read();

          fetched = true;
          return;
        }

        fetched = true;
      }

      @Override
      public Object[] next() {
        if (!hasNext()) {
          throw new NoSuchElementException();
        }
        fetched = false;
        return current;
      }

      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }

      private Object[] read() throws IOException {
        Object[] result;
        if (!reader.readRecord()) {
          return null;
        }
        String[] vals = reader.getValues();
        if (vals == null || vals.length == 0) {
          result = null;
        } else {
          result = new Object[vals.length];
          for (int i = 0; i < vals.length; i++) {
            result[i] = TypeConvertUtils.fromString(schema[i].getTypeInfo(), vals[i], false);
          }
        }
        return result;
      }

    };
  }