in odps-sdk-impl/odps-mapred-local/src/main/java/com/aliyun/odps/mapred/local/LocalTaskContext.java [339:432]
public Iterator<Record> readResourceTable(String tbl) throws IOException {
if (StringUtils.isEmpty(tbl)) {
throw new IOException("Table resouce name is empty or null");
}
if (!jobDirecotry.hasResource(tbl)) {
String project = SessionState.get().getOdps().getDefaultProject();
try {
WareHouse.getInstance().copyResource(project, tbl, jobDirecotry.getResourceDir(),
WareHouse.getInstance().getLimitDownloadRecordCount(),
WareHouse.getInstance().getInputColumnSeperator());
} catch (OdpsException e) {
}
}
File dir = new File(jobDirecotry.getResourceDir(), tbl);
LOG.info("Reading resource table from " + dir);
final List<File> datafiles = new ArrayList<File>();
LocalRunUtils.listAllDataFiles(dir, datafiles);
final TableMeta tableMeta = SchemaUtils.readSchema(dir);
return new Iterator<Record>() {
RecordReader reader;
Record current;
boolean fetched;
@Override
public boolean hasNext() {
if (fetched) {
return current != null;
}
// Fetch new one
try {
fetch();
} catch (IOException e) {
throw new RuntimeException(e);
}
return current != null;
}
private void fetch() throws IOException {
// first time
if (reader == null) {
if (datafiles.isEmpty()) {
current = null;
fetched = true;
return;
}
File f = datafiles.remove(0);
reader =
new CSVRecordReader(new FileSplit(f, tableMeta.getCols(), 0, f.getTotalSpace()),
tableMeta, LocalJobRunner.EMPTY_COUNTER, LocalJobRunner.EMPTY_COUNTER, counters,
WareHouse.getInstance().getInputColumnSeperator());
current = reader.read();
fetched = true;
return;
}
current = reader.read();
if (current == null && !datafiles.isEmpty()) {
File f = datafiles.remove(0);
reader =
new CSVRecordReader(new FileSplit(f, tableMeta.getCols(), 0, f.getTotalSpace()),
tableMeta, LocalJobRunner.EMPTY_COUNTER, LocalJobRunner.EMPTY_COUNTER, counters,
WareHouse.getInstance().getInputColumnSeperator());
current = reader.read();
fetched = true;
return;
}
fetched = true;
}
@Override
public Record next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
fetched = false;
return current.clone();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}