in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergPageSourceProvider.java [1076:1194]
private static ReaderPageSourceWithRowPositions createAvroPageSource(
TrinoFileSystem fileSystem,
TrinoInputFile inputFile,
long start,
long length,
long fileRecordCount,
int partitionSpecId,
String partitionData,
Schema fileSchema,
Optional<NameMapping> nameMapping,
List<IcebergColumnHandle> columns) {
ConstantPopulatingPageSource.Builder constantPopulatingPageSourceBuilder =
ConstantPopulatingPageSource.builder();
int avroSourceChannel = 0;
Optional<ReaderColumns> columnProjections = projectColumns(columns);
List<IcebergColumnHandle> readColumns =
columnProjections
.map(
readerColumns ->
(List<IcebergColumnHandle>)
readerColumns.get().stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList()))
.orElse(columns);
InputFile file;
OptionalLong fileModifiedTime = OptionalLong.empty();
try {
file = fileSystem.toFileIo().newInputFile(inputFile.location(), inputFile.length());
if (readColumns.stream().anyMatch(IcebergColumnHandle::isFileModifiedTimeColumn)) {
fileModifiedTime = OptionalLong.of(inputFile.modificationTime());
}
} catch (IOException e) {
throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, e);
}
// The column orders in the generated schema might be different from the original order
try (DataFileStream<?> avroFileReader =
new DataFileStream<>(file.newStream(), new GenericDatumReader<>())) {
org.apache.avro.Schema avroSchema = avroFileReader.getSchema();
List<org.apache.avro.Schema.Field> fileFields = avroSchema.getFields();
if (nameMapping.isPresent()
&& fileFields.stream().noneMatch(IcebergPageSourceProvider::hasId)) {
fileFields =
fileFields.stream()
.map(
field ->
setMissingFieldId(field, nameMapping.get(), ImmutableList.of(field.name())))
.collect(toImmutableList());
}
Map<Integer, org.apache.avro.Schema.Field> fileColumnsByIcebergId =
mapIdsToAvroFields(fileFields);
ImmutableList.Builder<String> columnNames = ImmutableList.builder();
ImmutableList.Builder<Type> columnTypes = ImmutableList.builder();
ImmutableList.Builder<Boolean> rowIndexChannels = ImmutableList.builder();
for (IcebergColumnHandle column : readColumns) {
verify(column.isBaseColumn(), "Column projections must be based from a root column");
org.apache.avro.Schema.Field field = fileColumnsByIcebergId.get(column.getId());
if (column.isPathColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(FILE_PATH.getType(), utf8Slice(file.location())));
} else if (column.isFileModifiedTimeColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(
FILE_MODIFIED_TIME.getType(),
packDateTimeWithZone(fileModifiedTime.orElseThrow(), UTC_KEY)));
} else if (column.isRowPositionColumn()) {
rowIndexChannels.add(true);
columnNames.add(ROW_POSITION.name());
columnTypes.add(BIGINT);
constantPopulatingPageSourceBuilder.addDelegateColumn(avroSourceChannel);
avroSourceChannel++;
} else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(column.getType(), fileRecordCount));
} else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(column.getType(), (long) partitionSpecId));
} else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(column.getType(), utf8Slice(partitionData)));
} else if (field == null) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(column.getType(), null));
} else {
rowIndexChannels.add(false);
columnNames.add(column.getName());
columnTypes.add(column.getType());
constantPopulatingPageSourceBuilder.addDelegateColumn(avroSourceChannel);
avroSourceChannel++;
}
}
return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
constantPopulatingPageSourceBuilder.build(
new IcebergAvroPageSource(
file,
start,
length,
fileSchema,
nameMapping,
columnNames.build(),
columnTypes.build(),
rowIndexChannels.build(),
newSimpleAggregatedMemoryContext())),
columnProjections),
Optional.empty(),
Optional.empty());
} catch (IOException e) {
throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, e);
}
}