public RecordReaderImpl()

in paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java [227:419]


    public RecordReaderImpl(
            ReaderImpl fileReader, Reader.Options options, @Nullable RoaringBitmap32 selection)
            throws IOException {
        this.selection = selection;
        OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion();
        SchemaEvolution evolution;
        if (options.getSchema() == null) {
            LOG.info("Reader schema not provided -- using file schema " + fileReader.getSchema());
            evolution = new SchemaEvolution(fileReader.getSchema(), null, options);
        } else {

            // Now that we are creating a record reader for a file, validate that
            // the schema to read is compatible with the file schema.
            //
            evolution = new SchemaEvolution(fileReader.getSchema(), options.getSchema(), options);
            if (LOG.isDebugEnabled() && evolution.hasConversion()) {
                LOG.debug(
                        "ORC file "
                                + fileReader.path.toString()
                                + " has data type conversion --\n"
                                + "reader schema: "
                                + options.getSchema().toString()
                                + "\n"
                                + "file schema:   "
                                + fileReader.getSchema());
            }
        }
        this.noSelectedVector = !options.useSelected();
        LOG.debug("noSelectedVector={}", this.noSelectedVector);
        this.schema = evolution.getReaderSchema();
        this.path = fileReader.path;
        this.rowIndexStride = fileReader.rowIndexStride;
        boolean ignoreNonUtf8BloomFilter =
                OrcConf.IGNORE_NON_UTF8_BLOOM_FILTERS.getBoolean(fileReader.conf);
        ReaderEncryption encryption = fileReader.getEncryption();
        this.fileIncluded = evolution.getFileIncluded();
        SearchArgument sarg = options.getSearchArgument();
        boolean[] rowIndexCols = new boolean[evolution.getFileIncluded().length];
        if (sarg != null && rowIndexStride > 0) {
            sargApp =
                    new SargApplier(
                            sarg,
                            rowIndexStride,
                            evolution,
                            writerVersion,
                            fileReader.useUTCTimestamp,
                            fileReader.writerUsedProlepticGregorian(),
                            fileReader.options.getConvertToProlepticGregorian());
            sargApp.setRowIndexCols(rowIndexCols);
        } else {
            sargApp = null;
        }

        long rows = 0;
        long skippedRows = 0;
        long offset = options.getOffset();
        long maxOffset = options.getMaxOffset();
        for (StripeInformation stripe : fileReader.getStripes()) {
            long stripeStart = stripe.getOffset();
            if (offset > stripeStart) {
                skippedRows += stripe.getNumberOfRows();
            } else if (stripeStart < maxOffset) {
                this.stripes.add(stripe);
                rows += stripe.getNumberOfRows();
            }
        }
        this.maxDiskRangeChunkLimit =
                OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(fileReader.conf);
        Boolean zeroCopy = options.getUseZeroCopy();
        if (zeroCopy == null) {
            zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
        }
        if (options.getDataReader() != null) {
            this.dataReader = options.getDataReader().clone();
        } else {
            InStream.StreamOptions unencryptedOptions =
                    InStream.options()
                            .withCodec(OrcCodecPool.getCodec(fileReader.getCompressionKind()))
                            .withBufferSize(fileReader.getCompressionSize());
            DataReaderProperties.Builder builder =
                    DataReaderProperties.builder()
                            .withCompression(unencryptedOptions)
                            .withFileSystemSupplier(fileReader.getFileSystemSupplier())
                            .withPath(fileReader.path)
                            .withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
                            .withZeroCopy(zeroCopy)
                            .withMinSeekSize(options.minSeekSize())
                            .withMinSeekSizeTolerance(options.minSeekSizeTolerance());
            FSDataInputStream file = fileReader.takeFile();
            if (file != null) {
                builder.withFile(file);
            }
            this.dataReader = RecordReaderUtils.createDefaultDataReader(builder.build());
        }
        firstRow = skippedRows;
        totalRowCount = rows;
        Boolean skipCorrupt = options.getSkipCorruptRecords();
        if (skipCorrupt == null) {
            skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
        }

        String[] filterCols = null;
        Consumer<OrcFilterContext> filterCallBack = null;
        String filePath =
                options.allowPluginFilters()
                        ? fileReader.getFileSystem().makeQualified(fileReader.path).toString()
                        : null;
        BatchFilter filter =
                FilterFactory.createBatchFilter(
                        options,
                        evolution.getReaderBaseSchema(),
                        evolution.isSchemaEvolutionCaseAware(),
                        fileReader.getFileVersion(),
                        false,
                        filePath,
                        fileReader.conf);
        if (filter != null) {
            // If a filter is determined then use this
            filterCallBack = filter;
            filterCols = filter.getColumnNames();
        }

        // Map columnNames to ColumnIds
        SortedSet<Integer> filterColIds = new TreeSet<>();
        if (filterCols != null) {
            for (String colName : filterCols) {
                TypeDescription expandCol = findColumnType(evolution, colName);
                // If the column is not present in the file then this can be ignored from read.
                if (expandCol == null || expandCol.getId() == -1) {
                    // Add -1 to filter columns so that the NullTreeReader is invoked during the
                    // LEADERS phase
                    filterColIds.add(-1);
                    // Determine the common parent and include these
                    expandCol = findMostCommonColumn(evolution, colName);
                }
                while (expandCol != null && expandCol.getId() != -1) {
                    // classify the column and the parent branch as LEAD
                    filterColIds.add(expandCol.getId());
                    rowIndexCols[expandCol.getId()] = true;
                    expandCol = expandCol.getParent();
                }
            }
            this.startReadPhase = TypeReader.ReadPhase.LEADERS;
            LOG.debug(
                    "Using startReadPhase: {} with filter columns: {}",
                    startReadPhase,
                    filterColIds);
        } else {
            this.startReadPhase = TypeReader.ReadPhase.ALL;
        }

        this.rowIndexColsToRead = ArrayUtils.contains(rowIndexCols, true) ? rowIndexCols : null;
        TreeReaderFactory.ReaderContext readerContext =
                new TreeReaderFactory.ReaderContext()
                        .setSchemaEvolution(evolution)
                        .setFilterCallback(filterColIds, filterCallBack)
                        .skipCorrupt(skipCorrupt)
                        .fileFormat(fileReader.getFileVersion())
                        .useUTCTimestamp(fileReader.useUTCTimestamp)
                        .setProlepticGregorian(
                                fileReader.writerUsedProlepticGregorian(),
                                fileReader.options.getConvertToProlepticGregorian())
                        .setEncryption(encryption);
        reader = TreeReaderFactory.createRootReader(evolution.getReaderSchema(), readerContext);
        skipBloomFilters = hasBadBloomFilters(fileReader.getFileTail().getFooter());

        int columns = evolution.getFileSchema().getMaximumId() + 1;
        indexes =
                new OrcIndex(
                        new OrcProto.RowIndex[columns],
                        new OrcProto.Stream.Kind[columns],
                        new OrcProto.BloomFilterIndex[columns]);

        planner =
                new StripePlanner(
                        evolution.getFileSchema(),
                        encryption,
                        dataReader,
                        writerVersion,
                        ignoreNonUtf8BloomFilter,
                        maxDiskRangeChunkLimit,
                        filterColIds);

        try {
            advanceToNextRow(reader, 0L, true);
        } catch (Exception e) {
            // Try to close since this happens in constructor.
            close();
            long stripeId = stripes.size() == 0 ? 0 : stripes.get(0).getStripeId();
            throw new IOException(
                    String.format("Problem opening stripe %d footer in %s.", stripeId, path), e);
        }
    }