private static ReaderPageSourceWithRowPositions createOrcPageSource()

in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergPageSourceProvider.java [479:679]


  private static ReaderPageSourceWithRowPositions createOrcPageSource(
      TrinoInputFile inputFile,
      long start,
      long length,
      long fileRecordCount,
      int partitionSpecId,
      String partitionData,
      List<IcebergColumnHandle> columns,
      TupleDomain<IcebergColumnHandle> effectivePredicate,
      OrcReaderOptions options,
      FileFormatDataSourceStats stats,
      TypeManager typeManager,
      Optional<NameMapping> nameMapping,
      Map<Integer, Optional<String>> partitionKeys) {
    OrcDataSource orcDataSource = null;
    try {
      orcDataSource = new TrinoOrcDataSource(inputFile, options, stats);

      OrcReader reader =
          OrcReader.createOrcReader(orcDataSource, options)
              .orElseThrow(() -> new TrinoException(ICEBERG_BAD_DATA, "ORC file is zero length"));

      List<OrcColumn> fileColumns = reader.getRootColumn().getNestedColumns();
      if (nameMapping.isPresent() && !hasIds(reader.getRootColumn())) {
        fileColumns =
            fileColumns.stream()
                .map(
                    orcColumn ->
                        setMissingFieldIds(
                            orcColumn,
                            nameMapping.get(),
                            ImmutableList.of(orcColumn.getColumnName())))
                .collect(toImmutableList());
      }

      Map<Integer, OrcColumn> fileColumnsByIcebergId = mapIdsToOrcFileColumns(fileColumns);

      TupleDomainOrcPredicateBuilder predicateBuilder =
          TupleDomainOrcPredicate.builder().setBloomFiltersEnabled(options.isBloomFiltersEnabled());
      Map<IcebergColumnHandle, Domain> effectivePredicateDomains =
          effectivePredicate
              .getDomains()
              .orElseThrow(() -> new IllegalArgumentException("Effective predicate is none"));

      Optional<ReaderColumns> columnProjections = projectColumns(columns);
      Map<Integer, List<List<Integer>>> projectionsByFieldId =
          columns.stream()
              .collect(
                  groupingBy(
                      column -> column.getBaseColumnIdentity().getId(),
                      mapping(IcebergColumnHandle::getPath, toUnmodifiableList())));

      List<IcebergColumnHandle> readColumns =
          columnProjections
              .map(
                  readerColumns ->
                      (List<IcebergColumnHandle>)
                          readerColumns.get().stream()
                              .map(IcebergColumnHandle.class::cast)
                              .collect(toImmutableList()))
              .orElse(columns);
      List<OrcColumn> fileReadColumns = new ArrayList<>(readColumns.size());
      List<Type> fileReadTypes = new ArrayList<>(readColumns.size());
      List<ProjectedLayout> projectedLayouts = new ArrayList<>(readColumns.size());
      List<ColumnAdaptation> columnAdaptations = new ArrayList<>(readColumns.size());

      for (IcebergColumnHandle column : readColumns) {
        verify(column.isBaseColumn(), "Column projections must be based from a root column");
        OrcColumn orcColumn = fileColumnsByIcebergId.get(column.getId());

        if (column.isIsDeletedColumn()) {
          columnAdaptations.add(
              ColumnAdaptation.constantColumn(nativeValueToBlock(BOOLEAN, false)));
        } else if (partitionKeys.containsKey(column.getId())) {
          Type trinoType = column.getType();
          columnAdaptations.add(
              ColumnAdaptation.constantColumn(
                  nativeValueToBlock(
                      trinoType,
                      deserializePartitionValue(
                          trinoType,
                          partitionKeys.get(column.getId()).orElse(null),
                          column.getName()))));
        } else if (column.isPathColumn()) {
          columnAdaptations.add(
              ColumnAdaptation.constantColumn(
                  nativeValueToBlock(FILE_PATH.getType(), utf8Slice(inputFile.location()))));
        } else if (column.isFileModifiedTimeColumn()) {
          columnAdaptations.add(
              ColumnAdaptation.constantColumn(
                  nativeValueToBlock(
                      FILE_MODIFIED_TIME.getType(),
                      packDateTimeWithZone(inputFile.modificationTime(), UTC_KEY))));
        } else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) {
          // $row_id is a composite of multiple physical columns. It is assembled by the
          // IcebergPageSource
          columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType()));
        } else if (column.isRowPositionColumn()) {
          columnAdaptations.add(ColumnAdaptation.positionColumn());
        } else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) {
          columnAdaptations.add(
              ColumnAdaptation.constantColumn(
                  nativeValueToBlock(column.getType(), fileRecordCount)));
        } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) {
          columnAdaptations.add(
              ColumnAdaptation.constantColumn(
                  nativeValueToBlock(column.getType(), (long) partitionSpecId)));
        } else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
          columnAdaptations.add(
              ColumnAdaptation.constantColumn(
                  nativeValueToBlock(column.getType(), utf8Slice(partitionData))));
        } else if (orcColumn != null) {
          Type readType = getOrcReadType(column.getType(), typeManager);

          if (column.getType() == UUID
              && !"UUID".equals(orcColumn.getAttributes().get(ICEBERG_BINARY_TYPE))) {
            throw new TrinoException(
                ICEBERG_BAD_DATA,
                format(
                    "Expected ORC column for UUID data to be annotated with %s=UUID: %s",
                    ICEBERG_BINARY_TYPE, orcColumn));
          }

          List<List<Integer>> fieldIdProjections = projectionsByFieldId.get(column.getId());
          ProjectedLayout projectedLayout =
              IcebergPageSourceProvider.IcebergOrcProjectedLayout.createProjectedLayout(
                  orcColumn, fieldIdProjections);

          int sourceIndex = fileReadColumns.size();
          columnAdaptations.add(ColumnAdaptation.sourceColumn(sourceIndex));
          fileReadColumns.add(orcColumn);
          fileReadTypes.add(readType);
          projectedLayouts.add(projectedLayout);

          for (Map.Entry<IcebergColumnHandle, Domain> domainEntry :
              effectivePredicateDomains.entrySet()) {
            IcebergColumnHandle predicateColumn = domainEntry.getKey();
            OrcColumn predicateOrcColumn = fileColumnsByIcebergId.get(predicateColumn.getId());
            if (predicateOrcColumn != null
                && column.getColumnIdentity().equals(predicateColumn.getBaseColumnIdentity())) {
              predicateBuilder.addColumn(predicateOrcColumn.getColumnId(), domainEntry.getValue());
            }
          }
        } else {
          columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType()));
        }
      }

      AggregatedMemoryContext memoryUsage = newSimpleAggregatedMemoryContext();
      OrcDataSourceId orcDataSourceId = orcDataSource.getId();
      OrcRecordReader recordReader =
          reader.createRecordReader(
              fileReadColumns,
              fileReadTypes,
              projectedLayouts,
              predicateBuilder.build(),
              start,
              length,
              UTC,
              memoryUsage,
              INITIAL_BATCH_SIZE,
              exception -> handleException(orcDataSourceId, exception),
              new IcebergPageSourceProvider.IdBasedFieldMapperFactory(readColumns));

      return new ReaderPageSourceWithRowPositions(
          new ReaderPageSource(
              new OrcPageSource(
                  recordReader,
                  columnAdaptations,
                  orcDataSource,
                  Optional.empty(),
                  Optional.empty(),
                  memoryUsage,
                  stats,
                  reader.getCompressionKind()),
              columnProjections),
          Optional.empty(),
          Optional.empty());
    } catch (IOException | RuntimeException e) {
      if (orcDataSource != null) {
        try {
          orcDataSource.close();
        } catch (IOException ex) {
          if (!e.equals(ex)) {
            e.addSuppressed(ex);
          }
        }
      }
      if (e instanceof TrinoException) {
        throw (TrinoException) e;
      }
      String message =
          format(
              "Error opening Iceberg split %s (offset=%s, length=%s): %s",
              inputFile.location(), start, length, e.getMessage());
      if (e instanceof BlockMissingException) {
        throw new TrinoException(ICEBERG_MISSING_DATA, message, e);
      }
      throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, message, e);
    }
  }