in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergPageSourceProvider.java [268:401]
public ConnectorPageSource createPageSource(
ConnectorTransactionHandle transaction,
ConnectorSession session,
ConnectorSplit connectorSplit,
ConnectorTableHandle connectorTable,
List<ColumnHandle> columns,
DynamicFilter dynamicFilter,
Map<Integer, Optional<String>> idToConstant,
boolean useIcebergDelete,
DateTimeZone dateTimeZone) {
IcebergSplit split = (IcebergSplit) connectorSplit;
IcebergTableHandle table = (IcebergTableHandle) connectorTable;
List<IcebergColumnHandle> icebergColumns =
columns.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList());
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
FileIO fileIO = fileSystem.toFileIo();
FileScanTask dummyFileScanTask = new DummyFileScanTask(split.getPath(), split.getDeletes());
Schema tableSchema = SchemaParser.fromJson(table.getTableSchemaJson());
// Creating a DeleteFilter with no requestedSchema ensures `deleteFilterRequiredSchema`
// is only columns needed by the filter.
List<IcebergColumnHandle> deleteFilterRequiredSchema =
getColumns(
useIcebergDelete
? new TrinoDeleteFilter(dummyFileScanTask, tableSchema, ImmutableList.of(), fileIO)
.requiredSchema()
: tableSchema,
typeManager);
PartitionSpec partitionSpec =
PartitionSpecParser.fromJson(tableSchema, split.getPartitionSpecJson());
org.apache.iceberg.types.Type[] partitionColumnTypes =
partitionSpec.fields().stream()
.map(field -> field.transform().getResultType(tableSchema.findType(field.sourceId())))
.toArray(org.apache.iceberg.types.Type[]::new);
PartitionData partitionData =
PartitionData.fromJson(split.getPartitionDataJson(), partitionColumnTypes);
Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(partitionData, partitionSpec);
// for mixed-format table
if (idToConstant != null) {
partitionKeys =
ImmutableMap.<Integer, Optional<String>>builder()
.putAll(partitionKeys)
.putAll(idToConstant)
.buildOrThrow();
}
ImmutableList.Builder<IcebergColumnHandle> requiredColumnsBuilder = ImmutableList.builder();
requiredColumnsBuilder.addAll(icebergColumns);
deleteFilterRequiredSchema.stream()
.filter(column -> !icebergColumns.contains(column))
.forEach(requiredColumnsBuilder::add);
List<IcebergColumnHandle> requiredColumns = requiredColumnsBuilder.build();
TupleDomain<IcebergColumnHandle> effectivePredicate =
table
.getUnenforcedPredicate()
.intersect(
dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast))
.simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);
if (effectivePredicate.isNone()) {
return new EmptyPageSource();
}
TrinoInputFile inputfile =
isUseFileSizeFromMetadata(session)
? fileSystem.newInputFile(split.getPath(), split.getFileSize())
: fileSystem.newInputFile(split.getPath());
IcebergPageSourceProvider.ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions =
createDataPageSource(
session,
fileSystem,
inputfile,
split.getStart(),
split.getLength(),
split.getFileRecordCount(),
partitionSpec.specId(),
split.getPartitionDataJson(),
split.getFileFormat(),
SchemaParser.fromJson(table.getTableSchemaJson()),
requiredColumns,
effectivePredicate,
table.getNameMappingJson().map(NameMappingParser::fromJson),
partitionKeys,
dateTimeZone);
ReaderPageSource dataPageSource = readerPageSourceWithRowPositions.getReaderPageSource();
Optional<ReaderProjectionsAdapter> projectionsAdapter =
dataPageSource
.getReaderColumns()
.map(
readerColumns ->
new ReaderProjectionsAdapter(
requiredColumns,
readerColumns,
column -> ((IcebergColumnHandle) column).getType(),
IcebergPageSourceProvider::applyProjection));
DeleteFilter<TrinoRow> deleteFilter =
new TrinoDeleteFilter(dummyFileScanTask, tableSchema, requiredColumns, fileIO);
Optional<PartitionData> partition =
partitionSpec.isUnpartitioned() ? Optional.empty() : Optional.of(partitionData);
LocationProvider locationProvider =
getLocationProvider(
table.getSchemaTableName(), table.getTableLocation(), table.getStorageProperties());
Supplier<IcebergPositionDeletePageSink> positionDeleteSink =
() ->
new IcebergPositionDeletePageSink(
split.getPath(),
partitionSpec,
partition,
locationProvider,
fileWriterFactory,
fileSystem,
jsonCodec,
session,
split.getFileFormat(),
table.getStorageProperties(),
split.getFileRecordCount());
return new IcebergPageSource(
icebergColumns,
requiredColumns,
dataPageSource.get(),
projectionsAdapter,
// Optional.of(deleteFilter).filter(filter -> filter.hasPosDeletes() ||
// filter.hasEqDeletes()),
// In order to be compatible with iceberg version 0.12
useIcebergDelete ? Optional.of(deleteFilter) : Optional.empty(),
positionDeleteSink);
}