private ConnectorPageSource createPageSource()

in src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java [159:276]


    private ConnectorPageSource createPageSource(
            ConnectorSession session,
            Table table,
            TupleDomain<TrinoColumnHandle> filter,
            TrinoSplit split,
            List<ColumnHandle> columns,
            OptionalLong limit) {
        RowType rowType = table.rowType();
        List<String> fieldNames = rowType.getFieldNames();
        List<String> projectedFields =
                columns.stream()
                        .map(TrinoColumnHandle.class::cast)
                        .map(TrinoColumnHandle::getColumnName)
                        .toList();
        TrinoFileSystem fileSystem = fileSystemFactory.create(session);
        Optional<Predicate> paimonFilter = new TrinoFilterConverter(rowType).convert(filter);

        try {
            Split paimonSplit = split.decodeSplit();
            Optional<List<RawFile>> optionalRawFiles = paimonSplit.convertToRawFiles();
            if (checkRawFile(optionalRawFiles)) {
                FileStoreTable fileStoreTable = (FileStoreTable) table;
                boolean readIndex = fileStoreTable.coreOptions().fileIndexReadEnabled();

                Optional<List<DeletionFile>> deletionFiles = paimonSplit.deletionFiles();
                Optional<List<IndexFile>> indexFiles =
                        readIndex ? paimonSplit.indexFiles() : Optional.empty();
                SchemaManager schemaManager =
                        new SchemaManager(fileStoreTable.fileIO(), fileStoreTable.location());
                List<Type> type =
                        columns.stream()
                                .map(s -> ((TrinoColumnHandle) s).getTrinoType())
                                .collect(Collectors.toList());

                try {
                    List<RawFile> files = optionalRawFiles.orElseThrow();
                    LinkedList<ConnectorPageSource> sources = new LinkedList<>();

                    // if file index exists, do the filter.
                    for (int i = 0; i < files.size(); i++) {
                        RawFile rawFile = files.get(i);
                        if (indexFiles.isPresent()) {
                            IndexFile indexFile = indexFiles.get().get(i);
                            if (indexFile != null && paimonFilter.isPresent()) {
                                try (FileIndexPredicate fileIndexPredicate =
                                        new FileIndexPredicate(
                                                new Path(indexFile.path()),
                                                ((FileStoreTable) table).fileIO(),
                                                rowType)) {
                                    if (!fileIndexPredicate.evaluate(paimonFilter.get()).remain()) {
                                        continue;
                                    }
                                }
                            }
                        }
                        ConnectorPageSource source =
                                createDataPageSource(
                                        rawFile.format(),
                                        fileSystem.newInputFile(Location.of(rawFile.path())),
                                        fileStoreTable.coreOptions(),
                                        // map table column name to data column
                                        // name, if column does not exist in
                                        // data columns, set it to null
                                        // columns those set to null will generate
                                        // a null vector in orc page
                                        fileStoreTable.schema().id() == rawFile.schemaId()
                                                ? projectedFields
                                                : schemaEvolutionFieldNames(
                                                        projectedFields,
                                                        rowType.getFields(),
                                                        schemaManager
                                                                .schema(rawFile.schemaId())
                                                                .fields()),
                                        type,
                                        orderDomains(projectedFields, filter));

                        if (deletionFiles.isPresent()) {
                            source =
                                    TrinoPageSourceWrapper.wrap(
                                            source,
                                            Optional.ofNullable(deletionFiles.get().get(i))
                                                    .map(
                                                            deletionFile -> {
                                                                try {
                                                                    return DeletionVector.read(
                                                                            fileStoreTable.fileIO(),
                                                                            deletionFile);
                                                                } catch (IOException e) {
                                                                    throw new RuntimeException(e);
                                                                }
                                                            }));
                        }
                        sources.add(source);
                    }

                    return new DirectTrinoPageSource(sources);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } else {
                int[] columnIndex =
                        projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();

                // old read way
                ReadBuilder read = table.newReadBuilder();
                paimonFilter.ifPresent(read::withFilter);

                if (!fieldNames.equals(projectedFields)) {
                    read.withProjection(columnIndex);
                }

                return new TrinoPageSource(
                        read.newRead().executeFilter().createReader(paimonSplit), columns, limit);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }