in amoro-format-mixed/amoro-mixed-trino/src/main/java/org/apache/amoro/trino/unkeyed/IcebergPageSourceProvider.java [479:679]
private static ReaderPageSourceWithRowPositions createOrcPageSource(
TrinoInputFile inputFile,
long start,
long length,
long fileRecordCount,
int partitionSpecId,
String partitionData,
List<IcebergColumnHandle> columns,
TupleDomain<IcebergColumnHandle> effectivePredicate,
OrcReaderOptions options,
FileFormatDataSourceStats stats,
TypeManager typeManager,
Optional<NameMapping> nameMapping,
Map<Integer, Optional<String>> partitionKeys) {
OrcDataSource orcDataSource = null;
try {
orcDataSource = new TrinoOrcDataSource(inputFile, options, stats);
OrcReader reader =
OrcReader.createOrcReader(orcDataSource, options)
.orElseThrow(() -> new TrinoException(ICEBERG_BAD_DATA, "ORC file is zero length"));
List<OrcColumn> fileColumns = reader.getRootColumn().getNestedColumns();
if (nameMapping.isPresent() && !hasIds(reader.getRootColumn())) {
fileColumns =
fileColumns.stream()
.map(
orcColumn ->
setMissingFieldIds(
orcColumn,
nameMapping.get(),
ImmutableList.of(orcColumn.getColumnName())))
.collect(toImmutableList());
}
Map<Integer, OrcColumn> fileColumnsByIcebergId = mapIdsToOrcFileColumns(fileColumns);
TupleDomainOrcPredicateBuilder predicateBuilder =
TupleDomainOrcPredicate.builder().setBloomFiltersEnabled(options.isBloomFiltersEnabled());
Map<IcebergColumnHandle, Domain> effectivePredicateDomains =
effectivePredicate
.getDomains()
.orElseThrow(() -> new IllegalArgumentException("Effective predicate is none"));
Optional<ReaderColumns> columnProjections = projectColumns(columns);
Map<Integer, List<List<Integer>>> projectionsByFieldId =
columns.stream()
.collect(
groupingBy(
column -> column.getBaseColumnIdentity().getId(),
mapping(IcebergColumnHandle::getPath, toUnmodifiableList())));
List<IcebergColumnHandle> readColumns =
columnProjections
.map(
readerColumns ->
(List<IcebergColumnHandle>)
readerColumns.get().stream()
.map(IcebergColumnHandle.class::cast)
.collect(toImmutableList()))
.orElse(columns);
List<OrcColumn> fileReadColumns = new ArrayList<>(readColumns.size());
List<Type> fileReadTypes = new ArrayList<>(readColumns.size());
List<ProjectedLayout> projectedLayouts = new ArrayList<>(readColumns.size());
List<ColumnAdaptation> columnAdaptations = new ArrayList<>(readColumns.size());
for (IcebergColumnHandle column : readColumns) {
verify(column.isBaseColumn(), "Column projections must be based from a root column");
OrcColumn orcColumn = fileColumnsByIcebergId.get(column.getId());
if (column.isIsDeletedColumn()) {
columnAdaptations.add(
ColumnAdaptation.constantColumn(nativeValueToBlock(BOOLEAN, false)));
} else if (partitionKeys.containsKey(column.getId())) {
Type trinoType = column.getType();
columnAdaptations.add(
ColumnAdaptation.constantColumn(
nativeValueToBlock(
trinoType,
deserializePartitionValue(
trinoType,
partitionKeys.get(column.getId()).orElse(null),
column.getName()))));
} else if (column.isPathColumn()) {
columnAdaptations.add(
ColumnAdaptation.constantColumn(
nativeValueToBlock(FILE_PATH.getType(), utf8Slice(inputFile.location()))));
} else if (column.isFileModifiedTimeColumn()) {
columnAdaptations.add(
ColumnAdaptation.constantColumn(
nativeValueToBlock(
FILE_MODIFIED_TIME.getType(),
packDateTimeWithZone(inputFile.modificationTime(), UTC_KEY))));
} else if (column.isUpdateRowIdColumn() || column.isMergeRowIdColumn()) {
// $row_id is a composite of multiple physical columns. It is assembled by the
// IcebergPageSource
columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType()));
} else if (column.isRowPositionColumn()) {
columnAdaptations.add(ColumnAdaptation.positionColumn());
} else if (column.getId() == TRINO_MERGE_FILE_RECORD_COUNT) {
columnAdaptations.add(
ColumnAdaptation.constantColumn(
nativeValueToBlock(column.getType(), fileRecordCount)));
} else if (column.getId() == TRINO_MERGE_PARTITION_SPEC_ID) {
columnAdaptations.add(
ColumnAdaptation.constantColumn(
nativeValueToBlock(column.getType(), (long) partitionSpecId)));
} else if (column.getId() == TRINO_MERGE_PARTITION_DATA) {
columnAdaptations.add(
ColumnAdaptation.constantColumn(
nativeValueToBlock(column.getType(), utf8Slice(partitionData))));
} else if (orcColumn != null) {
Type readType = getOrcReadType(column.getType(), typeManager);
if (column.getType() == UUID
&& !"UUID".equals(orcColumn.getAttributes().get(ICEBERG_BINARY_TYPE))) {
throw new TrinoException(
ICEBERG_BAD_DATA,
format(
"Expected ORC column for UUID data to be annotated with %s=UUID: %s",
ICEBERG_BINARY_TYPE, orcColumn));
}
List<List<Integer>> fieldIdProjections = projectionsByFieldId.get(column.getId());
ProjectedLayout projectedLayout =
IcebergPageSourceProvider.IcebergOrcProjectedLayout.createProjectedLayout(
orcColumn, fieldIdProjections);
int sourceIndex = fileReadColumns.size();
columnAdaptations.add(ColumnAdaptation.sourceColumn(sourceIndex));
fileReadColumns.add(orcColumn);
fileReadTypes.add(readType);
projectedLayouts.add(projectedLayout);
for (Map.Entry<IcebergColumnHandle, Domain> domainEntry :
effectivePredicateDomains.entrySet()) {
IcebergColumnHandle predicateColumn = domainEntry.getKey();
OrcColumn predicateOrcColumn = fileColumnsByIcebergId.get(predicateColumn.getId());
if (predicateOrcColumn != null
&& column.getColumnIdentity().equals(predicateColumn.getBaseColumnIdentity())) {
predicateBuilder.addColumn(predicateOrcColumn.getColumnId(), domainEntry.getValue());
}
}
} else {
columnAdaptations.add(ColumnAdaptation.nullColumn(column.getType()));
}
}
AggregatedMemoryContext memoryUsage = newSimpleAggregatedMemoryContext();
OrcDataSourceId orcDataSourceId = orcDataSource.getId();
OrcRecordReader recordReader =
reader.createRecordReader(
fileReadColumns,
fileReadTypes,
projectedLayouts,
predicateBuilder.build(),
start,
length,
UTC,
memoryUsage,
INITIAL_BATCH_SIZE,
exception -> handleException(orcDataSourceId, exception),
new IcebergPageSourceProvider.IdBasedFieldMapperFactory(readColumns));
return new ReaderPageSourceWithRowPositions(
new ReaderPageSource(
new OrcPageSource(
recordReader,
columnAdaptations,
orcDataSource,
Optional.empty(),
Optional.empty(),
memoryUsage,
stats,
reader.getCompressionKind()),
columnProjections),
Optional.empty(),
Optional.empty());
} catch (IOException | RuntimeException e) {
if (orcDataSource != null) {
try {
orcDataSource.close();
} catch (IOException ex) {
if (!e.equals(ex)) {
e.addSuppressed(ex);
}
}
}
if (e instanceof TrinoException) {
throw (TrinoException) e;
}
String message =
format(
"Error opening Iceberg split %s (offset=%s, length=%s): %s",
inputFile.location(), start, length, e.getMessage());
if (e instanceof BlockMissingException) {
throw new TrinoException(ICEBERG_MISSING_DATA, message, e);
}
throw new TrinoException(ICEBERG_CANNOT_OPEN_SPLIT, message, e);
}
}