in paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java [227:419]
public RecordReaderImpl(
ReaderImpl fileReader, Reader.Options options, @Nullable RoaringBitmap32 selection)
throws IOException {
this.selection = selection;
OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion();
SchemaEvolution evolution;
if (options.getSchema() == null) {
LOG.info("Reader schema not provided -- using file schema " + fileReader.getSchema());
evolution = new SchemaEvolution(fileReader.getSchema(), null, options);
} else {
// Now that we are creating a record reader for a file, validate that
// the schema to read is compatible with the file schema.
//
evolution = new SchemaEvolution(fileReader.getSchema(), options.getSchema(), options);
if (LOG.isDebugEnabled() && evolution.hasConversion()) {
LOG.debug(
"ORC file "
+ fileReader.path.toString()
+ " has data type conversion --\n"
+ "reader schema: "
+ options.getSchema().toString()
+ "\n"
+ "file schema: "
+ fileReader.getSchema());
}
}
this.noSelectedVector = !options.useSelected();
LOG.debug("noSelectedVector={}", this.noSelectedVector);
this.schema = evolution.getReaderSchema();
this.path = fileReader.path;
this.rowIndexStride = fileReader.rowIndexStride;
boolean ignoreNonUtf8BloomFilter =
OrcConf.IGNORE_NON_UTF8_BLOOM_FILTERS.getBoolean(fileReader.conf);
ReaderEncryption encryption = fileReader.getEncryption();
this.fileIncluded = evolution.getFileIncluded();
SearchArgument sarg = options.getSearchArgument();
boolean[] rowIndexCols = new boolean[evolution.getFileIncluded().length];
if (sarg != null && rowIndexStride > 0) {
sargApp =
new SargApplier(
sarg,
rowIndexStride,
evolution,
writerVersion,
fileReader.useUTCTimestamp,
fileReader.writerUsedProlepticGregorian(),
fileReader.options.getConvertToProlepticGregorian());
sargApp.setRowIndexCols(rowIndexCols);
} else {
sargApp = null;
}
long rows = 0;
long skippedRows = 0;
long offset = options.getOffset();
long maxOffset = options.getMaxOffset();
for (StripeInformation stripe : fileReader.getStripes()) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
skippedRows += stripe.getNumberOfRows();
} else if (stripeStart < maxOffset) {
this.stripes.add(stripe);
rows += stripe.getNumberOfRows();
}
}
this.maxDiskRangeChunkLimit =
OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(fileReader.conf);
Boolean zeroCopy = options.getUseZeroCopy();
if (zeroCopy == null) {
zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
}
if (options.getDataReader() != null) {
this.dataReader = options.getDataReader().clone();
} else {
InStream.StreamOptions unencryptedOptions =
InStream.options()
.withCodec(OrcCodecPool.getCodec(fileReader.getCompressionKind()))
.withBufferSize(fileReader.getCompressionSize());
DataReaderProperties.Builder builder =
DataReaderProperties.builder()
.withCompression(unencryptedOptions)
.withFileSystemSupplier(fileReader.getFileSystemSupplier())
.withPath(fileReader.path)
.withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
.withZeroCopy(zeroCopy)
.withMinSeekSize(options.minSeekSize())
.withMinSeekSizeTolerance(options.minSeekSizeTolerance());
FSDataInputStream file = fileReader.takeFile();
if (file != null) {
builder.withFile(file);
}
this.dataReader = RecordReaderUtils.createDefaultDataReader(builder.build());
}
firstRow = skippedRows;
totalRowCount = rows;
Boolean skipCorrupt = options.getSkipCorruptRecords();
if (skipCorrupt == null) {
skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
}
String[] filterCols = null;
Consumer<OrcFilterContext> filterCallBack = null;
String filePath =
options.allowPluginFilters()
? fileReader.getFileSystem().makeQualified(fileReader.path).toString()
: null;
BatchFilter filter =
FilterFactory.createBatchFilter(
options,
evolution.getReaderBaseSchema(),
evolution.isSchemaEvolutionCaseAware(),
fileReader.getFileVersion(),
false,
filePath,
fileReader.conf);
if (filter != null) {
// If a filter is determined then use this
filterCallBack = filter;
filterCols = filter.getColumnNames();
}
// Map columnNames to ColumnIds
SortedSet<Integer> filterColIds = new TreeSet<>();
if (filterCols != null) {
for (String colName : filterCols) {
TypeDescription expandCol = findColumnType(evolution, colName);
// If the column is not present in the file then this can be ignored from read.
if (expandCol == null || expandCol.getId() == -1) {
// Add -1 to filter columns so that the NullTreeReader is invoked during the
// LEADERS phase
filterColIds.add(-1);
// Determine the common parent and include these
expandCol = findMostCommonColumn(evolution, colName);
}
while (expandCol != null && expandCol.getId() != -1) {
// classify the column and the parent branch as LEAD
filterColIds.add(expandCol.getId());
rowIndexCols[expandCol.getId()] = true;
expandCol = expandCol.getParent();
}
}
this.startReadPhase = TypeReader.ReadPhase.LEADERS;
LOG.debug(
"Using startReadPhase: {} with filter columns: {}",
startReadPhase,
filterColIds);
} else {
this.startReadPhase = TypeReader.ReadPhase.ALL;
}
this.rowIndexColsToRead = ArrayUtils.contains(rowIndexCols, true) ? rowIndexCols : null;
TreeReaderFactory.ReaderContext readerContext =
new TreeReaderFactory.ReaderContext()
.setSchemaEvolution(evolution)
.setFilterCallback(filterColIds, filterCallBack)
.skipCorrupt(skipCorrupt)
.fileFormat(fileReader.getFileVersion())
.useUTCTimestamp(fileReader.useUTCTimestamp)
.setProlepticGregorian(
fileReader.writerUsedProlepticGregorian(),
fileReader.options.getConvertToProlepticGregorian())
.setEncryption(encryption);
reader = TreeReaderFactory.createRootReader(evolution.getReaderSchema(), readerContext);
skipBloomFilters = hasBadBloomFilters(fileReader.getFileTail().getFooter());
int columns = evolution.getFileSchema().getMaximumId() + 1;
indexes =
new OrcIndex(
new OrcProto.RowIndex[columns],
new OrcProto.Stream.Kind[columns],
new OrcProto.BloomFilterIndex[columns]);
planner =
new StripePlanner(
evolution.getFileSchema(),
encryption,
dataReader,
writerVersion,
ignoreNonUtf8BloomFilter,
maxDiskRangeChunkLimit,
filterColIds);
try {
advanceToNextRow(reader, 0L, true);
} catch (Exception e) {
// Try to close since this happens in constructor.
close();
long stripeId = stripes.size() == 0 ? 0 : stripes.get(0).getStripeId();
throw new IOException(
String.format("Problem opening stripe %d footer in %s.", stripeId, path), e);
}
}