in tajo-storage/src/main/java/org/apache/tajo/storage/v2/RCFile.java [1123:1216]
public Reader(Path file, ScheduledInputStream sin, int bufferSize, Configuration conf,
long start, long length) throws IOException {
tolerateCorruptions = conf.getBoolean(
TOLERATE_CORRUPTIONS_CONF_STR, false);
conf.setInt("io.file.buffer.size", bufferSize);
// in = openFile(fs, file, bufferSize, length);
this.file = file;
this.sin = sin;
this.conf = conf;
end = start + length;
boolean succeed = false;
try {
if (start > 0) {
seek(0);
init();
seek(start);
} else {
seek(0);
init();
}
succeed = true;
} finally {
if (!succeed) {
if (sin != null) {
try {
sin.close();
} catch(IOException e) {
if (LOG != null && LOG.isDebugEnabled()) {
LOG.debug("Exception in closing " + sin, e);
}
}
}
}
}
columnNumber = Integer.parseInt(metadata.get(
new Text(COLUMN_NUMBER_METADATA_STR)).toString());
java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils
.getReadColumnIDs(conf);
boolean[] skippedColIDs = new boolean[columnNumber];
if (notSkipIDs.size() > 0) {
for (int i = 0; i < skippedColIDs.length; i++) {
skippedColIDs[i] = true;
}
for (int read : notSkipIDs) {
if (read < columnNumber) {
skippedColIDs[read] = false;
}
}
} else {
// TODO: if no column name is specified e.g, in select count(1) from tt;
// skip all columns, this should be distinguished from the case:
// select * from tt;
for (int i = 0; i < skippedColIDs.length; i++) {
skippedColIDs[i] = false;
}
}
loadColumnNum = columnNumber;
if (skippedColIDs.length > 0) {
for (boolean skippedColID : skippedColIDs) {
if (skippedColID) {
loadColumnNum -= 1;
}
}
}
revPrjColIDs = new int[columnNumber];
// get list of selected column IDs
selectedColumns = new SelectedColumn[loadColumnNum];
colValLenBufferReadIn = new NonSyncDataInputBuffer[loadColumnNum];
for (int i = 0, j = 0; i < columnNumber; ++i) {
if (!skippedColIDs[i]) {
SelectedColumn col = new SelectedColumn();
col.colIndex = i;
col.runLength = 0;
col.prvLength = -1;
col.rowReadIndex = 0;
selectedColumns[j] = col;
colValLenBufferReadIn[j] = new NonSyncDataInputBuffer();
revPrjColIDs[i] = j;
j++;
} else {
revPrjColIDs[i] = -1;
}
}
currentKey = createKeyBuffer();
boolean lazyDecompress = !tolerateCorruptions;
currentValue = new ValueBuffer(
null, columnNumber, skippedColIDs, codec, lazyDecompress);
}