private static ReaderPageSourceWithRowPositions createParquetPageSource()

in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergPageSourceProvider.java [863:1074]


  private static ReaderPageSourceWithRowPositions createParquetPageSource(
      TrinoInputFile inputFile,
      long start,
      long length,
      long fileRecordCount,
      int partitionSpecId,
      String partitionData,
      List<IcebergColumnHandle> regularColumns,
      ParquetReaderOptions options,
      TupleDomain<IcebergColumnHandle> effectivePredicate,
      FileFormatDataSourceStats fileFormatDataSourceStats,
      Optional<NameMapping> nameMapping,
      Map<Integer, Optional<String>> partitionKeys,
      DateTimeZone dateTimeZone) {
    AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext();

    ParquetDataSource dataSource = null;
    try {
      dataSource = new TrinoParquetDataSource(inputFile, options, fileFormatDataSourceStats);
      ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
      FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
      MessageType fileSchema = fileMetaData.getSchema();
      if (nameMapping.isPresent() && !ParquetSchemaUtil.hasIds(fileSchema)) {
        // NameMapping conversion is necessary because MetadataReader converts all column names to
        // lowercase
        // and NameMapping is case sensitive
        fileSchema =
            ParquetSchemaUtil.applyNameMapping(fileSchema, convertToLowercase(nameMapping.get()));
      }

      // Mapping from Iceberg field ID to Parquet fields.
      Map<Integer, org.apache.parquet.schema.Type> parquetIdToField =
          fileSchema.getFields().stream()
              .filter(field -> field.getId() != null)
              .collect(toImmutableMap(field -> field.getId().intValue(), Function.identity()));

      Optional<ReaderColumns> columnProjections = projectColumns(regularColumns);
      List<IcebergColumnHandle> readColumns =
          columnProjections
              .map(
                  readerColumns ->
                      (List<IcebergColumnHandle>)
                          readerColumns.get().stream()
                              .map(IcebergColumnHandle.class::cast)
                              .collect(toImmutableList()))
              .orElse(regularColumns);

      List<org.apache.parquet.schema.Type> parquetFields =
          readColumns.stream()
              .map(column -> parquetIdToField.get(column.getId()))
              .collect(toList());

      MessageType requestedSchema =
          new MessageType(
              fileSchema.getName(),
              parquetFields.stream().filter(Objects::nonNull).collect(toImmutableList()));
      Map<List<String>, ColumnDescriptor> descriptorsByPath =
          getDescriptors(fileSchema, requestedSchema);
      TupleDomain<ColumnDescriptor> parquetTupleDomain =
          getParquetTupleDomain(descriptorsByPath, effectivePredicate);
      TupleDomainParquetPredicate parquetPredicate =
          buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, dateTimeZone);

      long nextStart = 0;
      Optional<Long> startRowPosition = Optional.empty();
      Optional<Long> endRowPosition = Optional.empty();
      ImmutableList.Builder<Long> blockStarts = ImmutableList.builder();
      List<BlockMetaData> blocks = new ArrayList<>();
      for (BlockMetaData block : parquetMetadata.getBlocks()) {
        long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
        if (start <= firstDataPage
            && firstDataPage < start + length
            && predicateMatches(
                parquetPredicate,
                block,
                dataSource,
                descriptorsByPath,
                parquetTupleDomain,
                Optional.empty(),
                Optional.empty(),
                dateTimeZone,
                ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) {
          blocks.add(block);
          blockStarts.add(nextStart);
          if (startRowPosition.isEmpty()) {
            startRowPosition = Optional.of(nextStart);
          }
          endRowPosition = Optional.of(nextStart + block.getRowCount());
        }
        nextStart += block.getRowCount();
      }

      MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);

      ConstantPopulatingPageSource.Builder constantPopulatingPageSourceBuilder =
          ConstantPopulatingPageSource.builder();
      int parquetSourceChannel = 0;

      ImmutableList.Builder<ParquetReaderColumn> parquetReaderColumnBuilder =
          ImmutableList.builder();
      for (int columnIndex = 0; columnIndex < readColumns.size(); columnIndex++) {
        IcebergColumnHandle column = readColumns.get(columnIndex);
        if (column.isIsDeletedColumn()) {
          constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(BOOLEAN, false));
        } else if (partitionKeys.containsKey(column.getId())) {
          Type trinoType = column.getType();
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(
                  trinoType,
                  deserializePartitionValue(
                      trinoType,
                      partitionKeys.get(column.getId()).orElse(null),
                      column.getName())));
        } else if (column.isPathColumn()) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(FILE_PATH.getType(), utf8Slice(inputFile.location())));
        } else if (column.isFileModifiedTimeColumn()) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(
                  FILE_MODIFIED_TIME.getType(),
                  packDateTimeWithZone(inputFile.modificationTime(), UTC_KEY)));
        } else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) {
          // $row_id is a composite of multiple physical columns, it is assembled by the
          // IcebergPageSource
          parquetReaderColumnBuilder.add(
              new ParquetReaderColumn(column.getType(), Optional.empty(), false));
          constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel);
          parquetSourceChannel++;
        } else if (column.isRowPositionColumn()) {
          parquetReaderColumnBuilder.add(new ParquetReaderColumn(BIGINT, Optional.empty(), true));
          constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel);
          parquetSourceChannel++;
        } else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(column.getType(), fileRecordCount));
        } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(column.getType(), (long) partitionSpecId));
        } else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(column.getType(), utf8Slice(partitionData)));
        } else {
          org.apache.parquet.schema.Type parquetField = parquetFields.get(columnIndex);
          Type trinoType = column.getBaseType();

          if (parquetField == null) {
            parquetReaderColumnBuilder.add(
                new ParquetReaderColumn(trinoType, Optional.empty(), false));
          } else {
            // The top level columns are already mapped by name/id appropriately.
            ColumnIO columnIO = messageColumnIO.getChild(parquetField.getName());
            parquetReaderColumnBuilder.add(
                new ParquetReaderColumn(
                    trinoType,
                    IcebergParquetColumnIOConverter.constructField(
                        new FieldContext(trinoType, column.getColumnIdentity()), columnIO),
                    false));
          }

          constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel);
          parquetSourceChannel++;
        }
      }

      List<ParquetReaderColumn> parquetReaderColumns = parquetReaderColumnBuilder.build();
      ParquetDataSourceId dataSourceId = dataSource.getId();
      ParquetReader parquetReader =
          new ParquetReader(
              Optional.ofNullable(fileMetaData.getCreatedBy()),
              getParquetReaderFields(parquetReaderColumns),
              blocks,
              blockStarts.build(),
              dataSource,
              dateTimeZone,
              memoryContext,
              options,
              exception -> handleException(dataSourceId, exception));
      return new ReaderPageSourceWithRowPositions(
          new ReaderPageSource(
              constantPopulatingPageSourceBuilder.build(
                  new ParquetPageSource(parquetReader, parquetReaderColumns)),
              columnProjections),
          startRowPosition,
          endRowPosition);
    } catch (IOException | RuntimeException e) {
      try {
        if (dataSource != null) {
          dataSource.close();
        }
      } catch (IOException ex) {
        if (!e.equals(ex)) {
          e.addSuppressed(ex);
        }
      }
      if (e instanceof TrinoException) {
        throw (TrinoException) e;
      }
      String message =
          format(
              "Error opening Iceberg split %s (offset=%s, length=%s): %s",
              inputFile.location(), start, length, e.getMessage());

      if (e instanceof ParquetCorruptionException) {
        throw new TrinoException(ICEBERG_BAD_DATA, message, e);
      }

      if (e instanceof BlockMissingException) {
        throw new TrinoException(ICEBERG_MISSING_DATA, message, e);
      }
      throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, message, e);
    }
  }