in odps-sdk-impl/odps-common-local/src/main/java/com/aliyun/odps/local/common/WareHouse.java [1046:1157]
public Iterator<Object[]> readResourceTable(String project, String resource,
final char inputColumnSeperator)
throws IOException, OdpsException {
if (!existsResource(project, resource)) {
DownloadUtils.downloadResource(WareHouse.getInstance().getOdps(), getOdps()
.getDefaultProject(), resource, getLimitDownloadRecordCount(), inputColumnSeperator);
}
File tableResourceDir = getReourceFile(project, resource);
if (!tableResourceDir.isDirectory()) {
throw new OdpsException("Resource " + project + "." + resource
+ " is not a valid file Resource, because it is not a direcotry");
}
// LOG.info("Reading resource table from " +
// tableResourceDir.getAbsolutePath());
TableInfo tableInfo = getReferencedTable(project, resource);
PartitionSpec partitionSpec = PartitionUtils.convert(tableInfo.getPartSpec());
final List<File> datafiles = getDataFiles(project, tableInfo.getTableName(), partitionSpec,
inputColumnSeperator);
final Column[] schema = SchemaUtils.readSchema(getTableDir(project, tableInfo.getTableName()))
.getCols();
return new Iterator<Object[]>() {
CsvReader reader;
Object[] current;
boolean fetched;
@Override
public boolean hasNext() {
if (fetched) {
return current != null;
}
// Fetch new one
try {
fetch();
} catch (IOException e) {
throw new RuntimeException(e);
}
return current != null;
}
private void fetch() throws IOException {
// first time
if (reader == null) {
if (datafiles.isEmpty()) {
current = null;
fetched = true;
return;
}
File f = datafiles.remove(0);
reader = DownloadUtils.newCsvReader(f.getAbsolutePath(), inputColumnSeperator, encoding);
reader.setSafetySwitch(false);
current = read();
fetched = true;
return;
}
current = read();
if (current == null && !datafiles.isEmpty()) {
File f = datafiles.remove(0);
reader = DownloadUtils.newCsvReader(f.getAbsolutePath(), inputColumnSeperator, encoding);
reader.setSafetySwitch(false);
current = read();
fetched = true;
return;
}
fetched = true;
}
@Override
public Object[] next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
fetched = false;
return current;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
private Object[] read() throws IOException {
Object[] result;
if (!reader.readRecord()) {
return null;
}
String[] vals = reader.getValues();
if (vals == null || vals.length == 0) {
result = null;
} else {
result = new Object[vals.length];
for (int i = 0; i < vals.length; i++) {
result[i] = TypeConvertUtils.fromString(schema[i].getTypeInfo(), vals[i], false);
}
}
return result;
}
};
}