in src/main/java/org/apache/paimon/trino/TrinoPageSourceProvider.java [99:157]
public ConnectorPageSource createPageSource(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorSplit split,
ConnectorTableHandle tableHandle,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter) {
trinoCatalog.initSession(session);
TrinoTableHandle trinoTableHandle = (TrinoTableHandle) tableHandle;
Table table = trinoTableHandle.tableWithDynamicOptions(trinoCatalog, session);
return runWithContextClassLoader(
() -> {
Optional<TrinoColumnHandle> rowId =
columns.stream()
.map(TrinoColumnHandle.class::cast)
.filter(column -> column.isRowId())
.findFirst();
if (rowId.isPresent()) {
List<ColumnHandle> dataColumns =
columns.stream()
.map(TrinoColumnHandle.class::cast)
.filter(column -> !column.isRowId())
.collect(Collectors.toList());
Set<String> rowIdFileds =
((io.trino.spi.type.RowType) rowId.get().getTrinoType())
.getFields().stream()
.map(io.trino.spi.type.RowType.Field::getName)
.map(Optional::get)
.collect(Collectors.toSet());
HashMap<String, Integer> fieldToIndex = new HashMap<>();
for (int i = 0; i < dataColumns.size(); i++) {
TrinoColumnHandle trinoColumnHandle =
(TrinoColumnHandle) dataColumns.get(i);
if (rowIdFileds.contains(trinoColumnHandle.getColumnName())) {
fieldToIndex.put(trinoColumnHandle.getColumnName(), i);
}
}
return TrinoMergePageSourceWrapper.wrap(
createPageSource(
session,
table,
trinoTableHandle.getFilter(),
(TrinoSplit) split,
dataColumns,
trinoTableHandle.getLimit()),
fieldToIndex);
} else {
return createPageSource(
session,
table,
trinoTableHandle.getFilter(),
(TrinoSplit) split,
columns,
trinoTableHandle.getLimit());
}
},
TrinoPageSourceProvider.class.getClassLoader());
}