in java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java [204:381]
protected RecordReaderImpl(ReaderImpl fileReader,
Reader.Options options) throws IOException {
OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion();
SchemaEvolution evolution;
if (options.getSchema() == null) {
LOG.debug("Reader schema not provided -- using file schema " +
fileReader.getSchema());
evolution = new SchemaEvolution(fileReader.getSchema(), null, options);
} else {
// Now that we are creating a record reader for a file, validate that
// the schema to read is compatible with the file schema.
//
evolution = new SchemaEvolution(fileReader.getSchema(),
options.getSchema(),
options);
if (LOG.isDebugEnabled() && evolution.hasConversion()) {
LOG.debug("ORC file " + fileReader.path.toString() +
" has data type conversion --\n" +
"reader schema: " + options.getSchema().toString() + "\n" +
"file schema: " + fileReader.getSchema());
}
}
this.noSelectedVector = !options.useSelected();
LOG.debug("noSelectedVector={}", this.noSelectedVector);
this.schema = evolution.getReaderSchema();
this.path = fileReader.path;
this.rowIndexStride = fileReader.rowIndexStride;
boolean ignoreNonUtf8BloomFilter =
OrcConf.IGNORE_NON_UTF8_BLOOM_FILTERS.getBoolean(fileReader.conf);
ReaderEncryption encryption = fileReader.getEncryption();
this.fileIncluded = evolution.getFileIncluded();
SearchArgument sarg = options.getSearchArgument();
boolean[] rowIndexCols = new boolean[evolution.getFileIncluded().length];
if (sarg != null && rowIndexStride > 0) {
sargApp = new SargApplier(sarg,
rowIndexStride,
evolution,
writerVersion,
fileReader.useUTCTimestamp,
fileReader.writerUsedProlepticGregorian(),
fileReader.options.getConvertToProlepticGregorian());
sargApp.setRowIndexCols(rowIndexCols);
} else {
sargApp = null;
}
long rows = 0;
long skippedRows = 0;
long offset = options.getOffset();
long maxOffset = options.getMaxOffset();
for(StripeInformation stripe: fileReader.getStripes()) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
skippedRows += stripe.getNumberOfRows();
} else if (stripeStart < maxOffset) {
this.stripes.add(stripe);
rows += stripe.getNumberOfRows();
}
}
this.maxDiskRangeChunkLimit = OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(fileReader.conf);
Boolean zeroCopy = options.getUseZeroCopy();
if (zeroCopy == null) {
zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
}
if (options.getDataReader() != null) {
this.dataReader = options.getDataReader().clone();
} else {
InStream.StreamOptions unencryptedOptions =
InStream.options()
.withCodec(OrcCodecPool.getCodec(fileReader.getCompressionKind()))
.withBufferSize(fileReader.getCompressionSize());
DataReaderProperties.Builder builder =
DataReaderProperties.builder()
.withCompression(unencryptedOptions)
.withFileSystemSupplier(fileReader.getFileSystemSupplier())
.withPath(fileReader.path)
.withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
.withZeroCopy(zeroCopy)
.withMinSeekSize(options.minSeekSize())
.withMinSeekSizeTolerance(options.minSeekSizeTolerance());
FSDataInputStream file = fileReader.takeFile();
if (file != null) {
builder.withFile(file);
}
this.dataReader = RecordReaderUtils.createDefaultDataReader(
builder.build());
}
firstRow = skippedRows;
totalRowCount = rows;
Boolean skipCorrupt = options.getSkipCorruptRecords();
if (skipCorrupt == null) {
skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
}
String[] filterCols = null;
Consumer<OrcFilterContext> filterCallBack = null;
String filePath = options.allowPluginFilters() ?
fileReader.getFileSystem().makeQualified(fileReader.path).toString() : null;
BatchFilter filter = FilterFactory.createBatchFilter(options,
evolution.getReaderBaseSchema(),
evolution.isSchemaEvolutionCaseAware(),
fileReader.getFileVersion(),
false,
filePath,
fileReader.conf);
if (filter != null) {
// If a filter is determined then use this
filterCallBack = filter;
filterCols = filter.getColumnNames();
}
// Map columnNames to ColumnIds
SortedSet<Integer> filterColIds = new TreeSet<>();
if (filterCols != null) {
for (String colName : filterCols) {
TypeDescription expandCol = findColumnType(evolution, colName);
// If the column is not present in the file then this can be ignored from read.
if (expandCol == null || expandCol.getId() == -1) {
// Add -1 to filter columns so that the NullTreeReader is invoked during the LEADERS phase
filterColIds.add(-1);
// Determine the common parent and include these
expandCol = findMostCommonColumn(evolution, colName);
}
while (expandCol != null && expandCol.getId() != -1) {
// classify the column and the parent branch as LEAD
filterColIds.add(expandCol.getId());
rowIndexCols[expandCol.getId()] = true;
expandCol = expandCol.getParent();
}
}
this.startReadPhase = TypeReader.ReadPhase.LEADERS;
LOG.debug("Using startReadPhase: {} with filter columns: {}", startReadPhase, filterColIds);
} else {
this.startReadPhase = TypeReader.ReadPhase.ALL;
}
var hasTrue = false;
for (boolean value: rowIndexCols) {
if (value) {
hasTrue = true;
break;
}
}
this.rowIndexColsToRead = hasTrue ? rowIndexCols : null;
TreeReaderFactory.ReaderContext readerContext =
new TreeReaderFactory.ReaderContext()
.setSchemaEvolution(evolution)
.setFilterCallback(filterColIds, filterCallBack)
.skipCorrupt(skipCorrupt)
.fileFormat(fileReader.getFileVersion())
.useUTCTimestamp(fileReader.useUTCTimestamp)
.setProlepticGregorian(fileReader.writerUsedProlepticGregorian(),
fileReader.options.getConvertToProlepticGregorian())
.setEncryption(encryption);
reader = TreeReaderFactory.createRootReader(evolution.getReaderSchema(), readerContext);
skipBloomFilters = hasBadBloomFilters(fileReader.getFileTail().getFooter());
int columns = evolution.getFileSchema().getMaximumId() + 1;
indexes = new OrcIndex(new OrcProto.RowIndex[columns],
new OrcProto.Stream.Kind[columns],
new OrcProto.BloomFilterIndex[columns]);
planner = new StripePlanner(evolution.getFileSchema(), encryption,
dataReader, writerVersion, ignoreNonUtf8BloomFilter,
maxDiskRangeChunkLimit, filterColIds);
try {
advanceToNextRow(reader, 0L, true);
} catch (Exception e) {
// Try to close since this happens in constructor.
close();
long stripeId = stripes.size() == 0 ? 0 : stripes.get(0).getStripeId();
throw new IOException(String.format("Problem opening stripe %d footer in %s.",
stripeId, path), e);
}
}