in amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquet.java [845:950]
public <D> CloseableIterable<D> build() {
if (readerFunc != null || batchedReaderFunc != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
// remove read properties already set that may conflict with this read
Configuration conf = new Configuration(((HadoopInputFile) file).getConf());
for (String property : READ_PROPERTIES_TO_REMOVE) {
conf.unset(property);
}
optionsBuilder = HadoopReadOptions.builder(conf);
} else {
optionsBuilder = ParquetReadOptions.builder();
}
for (Map.Entry<String, String> entry : properties.entrySet()) {
optionsBuilder.set(entry.getKey(), entry.getValue());
}
if (start != null) {
optionsBuilder.withRange(start, start + length);
}
ParquetReadOptions options = optionsBuilder.build();
if (batchedReaderFunc != null) {
return new VectorizedParquetReader<>(
file,
schema,
options,
batchedReaderFunc,
nameMapping,
filter,
reuseContainers,
caseSensitive,
maxRecordsPerBatch);
} else {
// Change for mixed-hive table ⬇
return new AdaptHiveParquetReader<>(
file,
schema,
options,
readerFunc,
nameMapping,
filter,
reuseContainers,
caseSensitive);
// Change for mixed-hive table ⬆
}
}
ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
builder.project(schema);
if (readSupport != null) {
builder.readSupport((ReadSupport<D>) readSupport);
} else {
builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL));
}
// default options for readers
builder
.set("parquet.strict.typing", "false") // allow type promotion
.set("parquet.avro.compatible", "false") // use the new RecordReader with Utf8 support
.set(
"parquet.avro.add-list-element-records",
"false"); // assume that lists use a 3-level schema
for (Map.Entry<String, String> entry : properties.entrySet()) {
builder.set(entry.getKey(), entry.getValue());
}
if (filter != null) {
// TODO: should not need to get the schema to push down before opening the file.
// Parquet should allow setting a filter inside its read support
MessageType type;
try (ParquetFileReader schemaReader = ParquetFileReader.open(ParquetIO.file(file))) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Schema fileSchema = ParquetSchemaUtil.convert(type);
builder
.useStatsFilter()
.useDictionaryFilter()
.useRecordFilter(filterRecords)
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder.useStatsFilter(false).useDictionaryFilter(false).useRecordFilter(false);
}
if (callInit) {
builder.callInit();
}
if (start != null) {
builder.withFileRange(start, start + length);
}
if (nameMapping != null) {
builder.withNameMapping(nameMapping);
}
return new ParquetIterable<>(builder);
}