public ConnectorPageSource createPageSource()

in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergPageSourceProvider.java [268:401]


  public ConnectorPageSource createPageSource(
      ConnectorTransactionHandle transaction,
      ConnectorSession session,
      ConnectorSplit connectorSplit,
      ConnectorTableHandle connectorTable,
      List<ColumnHandle> columns,
      DynamicFilter dynamicFilter,
      Map<Integer, Optional<String>> idToConstant,
      boolean useIcebergDelete,
      DateTimeZone dateTimeZone) {
    IcebergSplit split = (IcebergSplit) connectorSplit;
    IcebergTableHandle table = (IcebergTableHandle) connectorTable;

    List<IcebergColumnHandle> icebergColumns =
        columns.stream().map(IcebergColumnHandle.class::cast).collect(toImmutableList());

    TrinoFileSystem fileSystem = fileSystemFactory.create(session);
    FileIO fileIO = fileSystem.toFileIo();
    FileScanTask dummyFileScanTask = new DummyFileScanTask(split.getPath(), split.getDeletes());
    Schema tableSchema = SchemaParser.fromJson(table.getTableSchemaJson());
    // Creating a DeleteFilter with no requestedSchema ensures `deleteFilterRequiredSchema`
    // is only columns needed by the filter.
    List<IcebergColumnHandle> deleteFilterRequiredSchema =
        getColumns(
            useIcebergDelete
                ? new TrinoDeleteFilter(dummyFileScanTask, tableSchema, ImmutableList.of(), fileIO)
                    .requiredSchema()
                : tableSchema,
            typeManager);

    PartitionSpec partitionSpec =
        PartitionSpecParser.fromJson(tableSchema, split.getPartitionSpecJson());
    org.apache.iceberg.types.Type[] partitionColumnTypes =
        partitionSpec.fields().stream()
            .map(field -> field.transform().getResultType(tableSchema.findType(field.sourceId())))
            .toArray(org.apache.iceberg.types.Type[]::new);
    PartitionData partitionData =
        PartitionData.fromJson(split.getPartitionDataJson(), partitionColumnTypes);
    Map<Integer, Optional<String>> partitionKeys = getPartitionKeys(partitionData, partitionSpec);
    // for mixed-format table
    if (idToConstant != null) {
      partitionKeys =
          ImmutableMap.<Integer, Optional<String>>builder()
              .putAll(partitionKeys)
              .putAll(idToConstant)
              .buildOrThrow();
    }

    ImmutableList.Builder<IcebergColumnHandle> requiredColumnsBuilder = ImmutableList.builder();
    requiredColumnsBuilder.addAll(icebergColumns);
    deleteFilterRequiredSchema.stream()
        .filter(column -> !icebergColumns.contains(column))
        .forEach(requiredColumnsBuilder::add);
    List<IcebergColumnHandle> requiredColumns = requiredColumnsBuilder.build();

    TupleDomain<IcebergColumnHandle> effectivePredicate =
        table
            .getUnenforcedPredicate()
            .intersect(
                dynamicFilter.getCurrentPredicate().transformKeys(IcebergColumnHandle.class::cast))
            .simplify(ICEBERG_DOMAIN_COMPACTION_THRESHOLD);
    if (effectivePredicate.isNone()) {
      return new EmptyPageSource();
    }

    TrinoInputFile inputfile =
        isUseFileSizeFromMetadata(session)
            ? fileSystem.newInputFile(split.getPath(), split.getFileSize())
            : fileSystem.newInputFile(split.getPath());

    IcebergPageSourceProvider.ReaderPageSourceWithRowPositions readerPageSourceWithRowPositions =
        createDataPageSource(
            session,
            fileSystem,
            inputfile,
            split.getStart(),
            split.getLength(),
            split.getFileRecordCount(),
            partitionSpec.specId(),
            split.getPartitionDataJson(),
            split.getFileFormat(),
            SchemaParser.fromJson(table.getTableSchemaJson()),
            requiredColumns,
            effectivePredicate,
            table.getNameMappingJson().map(NameMappingParser::fromJson),
            partitionKeys,
            dateTimeZone);
    ReaderPageSource dataPageSource = readerPageSourceWithRowPositions.getReaderPageSource();

    Optional<ReaderProjectionsAdapter> projectionsAdapter =
        dataPageSource
            .getReaderColumns()
            .map(
                readerColumns ->
                    new ReaderProjectionsAdapter(
                        requiredColumns,
                        readerColumns,
                        column -> ((IcebergColumnHandle) column).getType(),
                        IcebergPageSourceProvider::applyProjection));

    DeleteFilter<TrinoRow> deleteFilter =
        new TrinoDeleteFilter(dummyFileScanTask, tableSchema, requiredColumns, fileIO);

    Optional<PartitionData> partition =
        partitionSpec.isUnpartitioned() ? Optional.empty() : Optional.of(partitionData);
    LocationProvider locationProvider =
        getLocationProvider(
            table.getSchemaTableName(), table.getTableLocation(), table.getStorageProperties());
    Supplier<IcebergPositionDeletePageSink> positionDeleteSink =
        () ->
            new IcebergPositionDeletePageSink(
                split.getPath(),
                partitionSpec,
                partition,
                locationProvider,
                fileWriterFactory,
                fileSystem,
                jsonCodec,
                session,
                split.getFileFormat(),
                table.getStorageProperties(),
                split.getFileRecordCount());

    return new IcebergPageSource(
        icebergColumns,
        requiredColumns,
        dataPageSource.get(),
        projectionsAdapter,
        //                Optional.of(deleteFilter).filter(filter -> filter.hasPosDeletes() ||
        // filter.hasEqDeletes()),
        // In order to be compatible with iceberg version 0.12
        useIcebergDelete ? Optional.of(deleteFilter) : Optional.empty(),
        positionDeleteSink);
  }