in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergPageSourceProvider.java [863:1074]
private static ReaderPageSourceWithRowPositions createParquetPageSource(
TrinoInputFile inputFile,
long start,
long length,
long fileRecordCount,
int partitionSpecId,
String partitionData,
List<IcebergColumnHandle> regularColumns,
ParquetReaderOptions options,
TupleDomain<IcebergColumnHandle> effectivePredicate,
FileFormatDataSourceStats fileFormatDataSourceStats,
Optional<NameMapping> nameMapping,
Map<Integer, Optional<String>> partitionKeys,
DateTimeZone dateTimeZone) {
AggregatedMemoryContext memoryContext = newSimpleAggregatedMemoryContext();
ParquetDataSource dataSource = null;
try {
dataSource = new TrinoParquetDataSource(inputFile, options, fileFormatDataSourceStats);
ParquetMetadata parquetMetadata = MetadataReader.readFooter(dataSource, Optional.empty());
FileMetaData fileMetaData = parquetMetadata.getFileMetaData();
MessageType fileSchema = fileMetaData.getSchema();
if (nameMapping.isPresent() && !ParquetSchemaUtil.hasIds(fileSchema)) {
// NameMapping conversion is necessary because MetadataReader converts all column names to
// lowercase
// and NameMapping is case sensitive
fileSchema =
ParquetSchemaUtil.applyNameMapping(fileSchema, convertToLowercase(nameMapping.get()));
}
// Mapping from Iceberg field ID to Parquet fields.
Map<Integer, org.apache.parquet.schema.Type> parquetIdToField =
fileSchema.getFields().stream()
.filter(field -> field.getId() != null)
.collect(toImmutableMap(field -> field.getId().intValue(), Function.identity()));
Optional<ReaderColumns> columnProjections = projectColumns(regularColumns);
List<IcebergColumnHandle> readColumns =
columnProjections
.map(
readerColumns ->
(List<IcebergColumnHandle>)
readerColumns.get().stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList()))
.orElse(regularColumns);
List<org.apache.parquet.schema.Type> parquetFields =
readColumns.stream()
.map(column -> parquetIdToField.get(column.getId()))
.collect(toList());
MessageType requestedSchema =
new MessageType(
fileSchema.getName(),
parquetFields.stream().filter(Objects::nonNull).collect(toImmutableList()));
Map<List<String>, ColumnDescriptor> descriptorsByPath =
getDescriptors(fileSchema, requestedSchema);
TupleDomain<ColumnDescriptor> parquetTupleDomain =
getParquetTupleDomain(descriptorsByPath, effectivePredicate);
TupleDomainParquetPredicate parquetPredicate =
buildPredicate(requestedSchema, parquetTupleDomain, descriptorsByPath, dateTimeZone);
long nextStart = 0;
Optional<Long> startRowPosition = Optional.empty();
Optional<Long> endRowPosition = Optional.empty();
ImmutableList.Builder<Long> blockStarts = ImmutableList.builder();
List<BlockMetaData> blocks = new ArrayList<>();
for (BlockMetaData block : parquetMetadata.getBlocks()) {
long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
if (start <= firstDataPage
&& firstDataPage < start + length
&& predicateMatches(
parquetPredicate,
block,
dataSource,
descriptorsByPath,
parquetTupleDomain,
Optional.empty(),
Optional.empty(),
dateTimeZone,
ICEBERG_DOMAIN_COMPACTION_THRESHOLD)) {
blocks.add(block);
blockStarts.add(nextStart);
if (startRowPosition.isEmpty()) {
startRowPosition = Optional.of(nextStart);
}
endRowPosition = Optional.of(nextStart + block.getRowCount());
}
nextStart += block.getRowCount();
}
MessageColumnIO messageColumnIO = getColumnIO(fileSchema, requestedSchema);
ConstantPopulatingPageSource.Builder constantPopulatingPageSourceBuilder =
ConstantPopulatingPageSource.builder();
int parquetSourceChannel = 0;
ImmutableList.Builder<ParquetReaderColumn> parquetReaderColumnBuilder =
ImmutableList.builder();
for (int columnIndex = 0; columnIndex < readColumns.size(); columnIndex++) {
IcebergColumnHandle column = readColumns.get(columnIndex);
if (column.isIsDeletedColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(nativeValueToBlock(BOOLEAN, false));
} else if (partitionKeys.containsKey(column.getId())) {
Type trinoType = column.getType();
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(
trinoType,
deserializePartitionValue(
trinoType,
partitionKeys.get(column.getId()).orElse(null),
column.getName())));
} else if (column.isPathColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(FILE_PATH.getType(), utf8Slice(inputFile.location())));
} else if (column.isFileModifiedTimeColumn()) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(
FILE_MODIFIED_TIME.getType(),
packDateTimeWithZone(inputFile.modificationTime(), UTC_KEY)));
} else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) {
// $row_id is a composite of multiple physical columns, it is assembled by the
// IcebergPageSource
parquetReaderColumnBuilder.add(
new ParquetReaderColumn(column.getType(), Optional.empty(), false));
constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel);
parquetSourceChannel++;
} else if (column.isRowPositionColumn()) {
parquetReaderColumnBuilder.add(new ParquetReaderColumn(BIGINT, Optional.empty(), true));
constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel);
parquetSourceChannel++;
} else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(column.getType(), fileRecordCount));
} else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(column.getType(), (long) partitionSpecId));
} else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
constantPopulatingPageSourceBuilder.addConstantColumn(
nativeValueToBlock(column.getType(), utf8Slice(partitionData)));
} else {
org.apache.parquet.schema.Type parquetField = parquetFields.get(columnIndex);
Type trinoType = column.getBaseType();
if (parquetField == null) {
parquetReaderColumnBuilder.add(
new ParquetReaderColumn(trinoType, Optional.empty(), false));
} else {
// The top level columns are already mapped by name/id appropriately.
ColumnIO columnIO = messageColumnIO.getChild(parquetField.getName());
parquetReaderColumnBuilder.add(
new ParquetReaderColumn(
trinoType,
IcebergParquetColumnIOConverter.constructField(
new FieldContext(trinoType, column.getColumnIdentity()), columnIO),
false));
}
constantPopulatingPageSourceBuilder.addDelegateColumn(parquetSourceChannel);
parquetSourceChannel++;
}
}
List<ParquetReaderColumn> parquetReaderColumns = parquetReaderColumnBuilder.build();
ParquetDataSourceId dataSourceId = dataSource.getId();
ParquetReader parquetReader =
new ParquetReader(
Optional.ofNullable(fileMetaData.getCreatedBy()),
getParquetReaderFields(parquetReaderColumns),
blocks,
blockStarts.build(),
dataSource,
dateTimeZone,
memoryContext,
options,
exception -> handleException(dataSourceId, exception));
return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
constantPopulatingPageSourceBuilder.build(
new ParquetPageSource(parquetReader, parquetReaderColumns)),
columnProjections),
startRowPosition,
endRowPosition);
} catch (IOException | RuntimeException e) {
try {
if (dataSource != null) {
dataSource.close();
}
} catch (IOException ex) {
if (!e.equals(ex)) {
e.addSuppressed(ex);
}
}
if (e instanceof TrinoException) {
throw (TrinoException) e;
}
String message =
format(
"Error opening Iceberg split %s (offset=%s, length=%s): %s",
inputFile.location(), start, length, e.getMessage());
if (e instanceof ParquetCorruptionException) {
throw new TrinoException(ICEBERG_BAD_DATA, message, e);
}
if (e instanceof BlockMissingException) {
throw new TrinoException(ICEBERG_MISSING_DATA, message, e);
}
throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, message, e);
}
}