in parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java [1267:1407]
public <D> CloseableIterable<D> build() {
FileDecryptionProperties fileDecryptionProperties = null;
if (fileEncryptionKey != null) {
byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
fileDecryptionProperties =
FileDecryptionProperties.builder()
.withFooterKey(encryptionKeyArray)
.withAADPrefix(aadPrefixArray)
.build();
} else {
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key");
}
if (readerFunc != null || readerFuncWithSchema != null || batchedReaderFunc != null) {
ParquetReadOptions.Builder optionsBuilder;
if (file instanceof HadoopInputFile) {
// remove read properties already set that may conflict with this read
Configuration conf = new Configuration(((HadoopInputFile) file).getConf());
for (String property : READ_PROPERTIES_TO_REMOVE) {
conf.unset(property);
}
optionsBuilder = HadoopReadOptions.builder(conf);
} else {
optionsBuilder = ParquetReadOptions.builder(new PlainParquetConfiguration());
}
for (Map.Entry<String, String> entry : properties.entrySet()) {
optionsBuilder.set(entry.getKey(), entry.getValue());
}
if (start != null) {
optionsBuilder.withRange(start, start + length);
}
if (fileDecryptionProperties != null) {
optionsBuilder.withDecryption(fileDecryptionProperties);
}
ParquetReadOptions options = optionsBuilder.build();
NameMapping mapping;
if (nameMapping != null) {
mapping = nameMapping;
} else if (SystemConfigs.NETFLIX_UNSAFE_PARQUET_ID_FALLBACK_ENABLED.value()) {
mapping = null;
} else {
mapping = NameMapping.empty();
}
if (batchedReaderFunc != null) {
return new VectorizedParquetReader<>(
file,
schema,
options,
batchedReaderFunc,
mapping,
filter,
reuseContainers,
caseSensitive,
maxRecordsPerBatch);
} else {
Function<MessageType, ParquetValueReader<?>> readBuilder =
readerFuncWithSchema != null
? fileType -> readerFuncWithSchema.apply(schema, fileType)
: readerFunc;
return new org.apache.iceberg.parquet.ParquetReader<>(
file, schema, options, readBuilder, mapping, filter, reuseContainers, caseSensitive);
}
}
ParquetReadBuilder<D> builder = new ParquetReadBuilder<>(ParquetIO.file(file));
builder.project(schema);
if (readSupport != null) {
builder.readSupport((ReadSupport<D>) readSupport);
} else {
builder.readSupport(new AvroReadSupport<>(ParquetAvro.DEFAULT_MODEL));
}
// default options for readers
builder
.set("parquet.strict.typing", "false") // allow type promotion
.set("parquet.avro.compatible", "false") // use the new RecordReader with Utf8 support
.set(
"parquet.avro.add-list-element-records",
"false"); // assume that lists use a 3-level schema
for (Map.Entry<String, String> entry : properties.entrySet()) {
builder.set(entry.getKey(), entry.getValue());
}
if (filter != null) {
// TODO: should not need to get the schema to push down before opening the file.
// Parquet should allow setting a filter inside its read support
ParquetReadOptions decryptOptions =
ParquetReadOptions.builder(new PlainParquetConfiguration())
.withDecryption(fileDecryptionProperties)
.build();
MessageType type;
try (ParquetFileReader schemaReader =
ParquetFileReader.open(ParquetIO.file(file), decryptOptions)) {
type = schemaReader.getFileMetaData().getSchema();
} catch (IOException e) {
throw new RuntimeIOException(e);
}
Schema fileSchema = ParquetSchemaUtil.convert(type);
builder
.useStatsFilter()
.useDictionaryFilter()
.useRecordFilter(filterRecords)
.useBloomFilter()
.withFilter(ParquetFilters.convert(fileSchema, filter, caseSensitive));
} else {
// turn off filtering
builder
.useStatsFilter(false)
.useDictionaryFilter(false)
.useBloomFilter(false)
.useRecordFilter(false);
}
if (callInit) {
builder.callInit();
}
if (start != null) {
builder.withFileRange(start, start + length);
}
if (nameMapping != null) {
builder.withNameMapping(nameMapping);
}
if (fileDecryptionProperties != null) {
builder.withDecryption(fileDecryptionProperties);
}
return new ParquetIterable<>(builder);
}