in src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java [159:276]
private ConnectorPageSource createPageSource(
ConnectorSession session,
Table table,
TupleDomain<TrinoColumnHandle> filter,
TrinoSplit split,
List<ColumnHandle> columns,
OptionalLong limit) {
RowType rowType = table.rowType();
List<String> fieldNames = rowType.getFieldNames();
List<String> projectedFields =
columns.stream()
.map(TrinoColumnHandle.class::cast)
.map(TrinoColumnHandle::getColumnName)
.toList();
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
Optional<Predicate> paimonFilter = new TrinoFilterConverter(rowType).convert(filter);
try {
Split paimonSplit = split.decodeSplit();
Optional<List<RawFile>> optionalRawFiles = paimonSplit.convertToRawFiles();
if (checkRawFile(optionalRawFiles)) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
boolean readIndex = fileStoreTable.coreOptions().fileIndexReadEnabled();
Optional<List<DeletionFile>> deletionFiles = paimonSplit.deletionFiles();
Optional<List<IndexFile>> indexFiles =
readIndex ? paimonSplit.indexFiles() : Optional.empty();
SchemaManager schemaManager =
new SchemaManager(fileStoreTable.fileIO(), fileStoreTable.location());
List<Type> type =
columns.stream()
.map(s -> ((TrinoColumnHandle) s).getTrinoType())
.collect(Collectors.toList());
try {
List<RawFile> files = optionalRawFiles.orElseThrow();
LinkedList<ConnectorPageSource> sources = new LinkedList<>();
// if file index exists, do the filter.
for (int i = 0; i < files.size(); i++) {
RawFile rawFile = files.get(i);
if (indexFiles.isPresent()) {
IndexFile indexFile = indexFiles.get().get(i);
if (indexFile != null && paimonFilter.isPresent()) {
try (FileIndexPredicate fileIndexPredicate =
new FileIndexPredicate(
new Path(indexFile.path()),
((FileStoreTable) table).fileIO(),
rowType)) {
if (!fileIndexPredicate.evaluate(paimonFilter.get()).remain()) {
continue;
}
}
}
}
ConnectorPageSource source =
createDataPageSource(
rawFile.format(),
fileSystem.newInputFile(Location.of(rawFile.path())),
fileStoreTable.coreOptions(),
// map table column name to data column
// name, if column does not exist in
// data columns, set it to null
// columns those set to null will generate
// a null vector in orc page
fileStoreTable.schema().id() == rawFile.schemaId()
? projectedFields
: schemaEvolutionFieldNames(
projectedFields,
rowType.getFields(),
schemaManager
.schema(rawFile.schemaId())
.fields()),
type,
orderDomains(projectedFields, filter));
if (deletionFiles.isPresent()) {
source =
TrinoPageSourceWrapper.wrap(
source,
Optional.ofNullable(deletionFiles.get().get(i))
.map(
deletionFile -> {
try {
return DeletionVector.read(
fileStoreTable.fileIO(),
deletionFile);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
}
sources.add(source);
}
return new DirectTrinoPageSource(sources);
} catch (Exception e) {
throw new RuntimeException(e);
}
} else {
int[] columnIndex =
projectedFields.stream().mapToInt(fieldNames::indexOf).toArray();
// old read way
ReadBuilder read = table.newReadBuilder();
paimonFilter.ifPresent(read::withFilter);
if (!fieldNames.equals(projectedFields)) {
read.withProjection(columnIndex);
}
return new TrinoPageSource(
read.newRead().executeFilter().createReader(paimonSplit), columns, limit);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}