private static ReaderPageSourceWithRowPositions createAvroPageSource()

in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergPageSourceProvider.java [1076:1194]


  private static ReaderPageSourceWithRowPositions createAvroPageSource(
      TrinoFileSystem fileSystem,
      TrinoInputFile inputFile,
      long start,
      long length,
      long fileRecordCount,
      int partitionSpecId,
      String partitionData,
      Schema fileSchema,
      Optional<NameMapping> nameMapping,
      List<IcebergColumnHandle> columns) {
    ConstantPopulatingPageSource.Builder constantPopulatingPageSourceBuilder =
        ConstantPopulatingPageSource.builder();
    int avroSourceChannel = 0;

    Optional<ReaderColumns> columnProjections = projectColumns(columns);

    List<IcebergColumnHandle> readColumns =
        columnProjections
            .map(
                readerColumns ->
                    (List<IcebergColumnHandle>)
                        readerColumns.get().stream()
                            .map(IcebergColumnHandle.class::cast)
                            .collect(toImmutableList()))
            .orElse(columns);

    InputFile file;
    OptionalLong fileModifiedTime = OptionalLong.empty();
    try {
      file = fileSystem.toFileIo().newInputFile(inputFile.location(), inputFile.length());
      if (readColumns.stream().anyMatch(IcebergColumnHandle::isFileModifiedTimeColumn)) {
        fileModifiedTime = OptionalLong.of(inputFile.modificationTime());
      }
    } catch (IOException e) {
      throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, e);
    }

    // The column orders in the generated schema might be different from the original order
    try (DataFileStream<?> avroFileReader =
        new DataFileStream<>(file.newStream(), new GenericDatumReader<>())) {
      org.apache.avro.Schema avroSchema = avroFileReader.getSchema();
      List<org.apache.avro.Schema.Field> fileFields = avroSchema.getFields();
      if (nameMapping.isPresent()
          && fileFields.stream().noneMatch(IcebergPageSourceProvider::hasId)) {
        fileFields =
            fileFields.stream()
                .map(
                    field ->
                        setMissingFieldId(field, nameMapping.get(), ImmutableList.of(field.name())))
                .collect(toImmutableList());
      }

      Map<Integer, org.apache.avro.Schema.Field> fileColumnsByIcebergId =
          mapIdsToAvroFields(fileFields);

      ImmutableList.Builder<String> columnNames = ImmutableList.builder();
      ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
      ImmutableList.Builder<Boolean> rowIndexChannels = ImmutableList.builder();

      for (IcebergColumnHandle column : readColumns) {
        verify(column.isBaseColumn(), "Column projections must be based from a root column");
        org.apache.avro.Schema.Field field = fileColumnsByIcebergId.get(column.getId());

        if (column.isPathColumn()) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(FILE_PATH.getType(), utf8Slice(file.location())));
        } else if (column.isFileModifiedTimeColumn()) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(
                  FILE_MODIFIED_TIME.getType(),
                  packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY)));
        } else if (column.isRowPositionColumn()) {
          rowIndexChannels.add(true);
          columnNames.add(ROW_POSITION.name());
          columnTypes.add(BIGINT);
          constantPopulatingPageSourceBuilder.addDelegateColumn(avroSourceChannel);
          avroSourceChannel++;
        } else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(column.getType(), fileRecordCount));
        } else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(column.getType(), (long) partitionSpecId));
        } else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(column.getType(), utf8Slice(partitionData)));
        } else if (field == null) {
          constantPopulatingPageSourceBuilder.addConstantColumn(
              nativeValueToBlock(column.getType(), null));
        } else {
          rowIndexChannels.add(false);
          columnNames.add(column.getName());
          columnTypes.add(column.getType());
          constantPopulatingPageSourceBuilder.addDelegateColumn(avroSourceChannel);
          avroSourceChannel++;
        }
      }

      return new ReaderPageSourceWithRowPositions(
          new ReaderPageSource(
              constantPopulatingPageSourceBuilder.build(
                  new IcebergAvroPageSource(
                      file,
                      start,
                      length,
                      fileSchema,
                      nameMapping,
                      columnNames.build(),
                      columnTypes.build(),
                      rowIndexChannels.build(),
                      newSimpleAggregatedMemoryContext())),
              columnProjections),
          Optional.empty(),
          Optional.empty());
    } catch (IOException e) {
      throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, e);
    }
  }