in parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java [348:466]
public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be null");
// add the Iceberg schema to keyValueMetadata
meta("iceberg.schema", SchemaParser.toJson(schema));
// Map Iceberg properties to pass down to the Parquet writer
Context context = createContextFunc.apply(config);
int rowGroupSize = context.rowGroupSize();
int pageSize = context.pageSize();
int pageRowLimit = context.pageRowLimit();
int dictionaryPageSize = context.dictionaryPageSize();
String compressionLevel = context.compressionLevel();
CompressionCodecName codec = context.codec();
int rowGroupCheckMinRecordCount = context.rowGroupCheckMinRecordCount();
int rowGroupCheckMaxRecordCount = context.rowGroupCheckMaxRecordCount();
int bloomFilterMaxBytes = context.bloomFilterMaxBytes();
boolean dictionaryEnabled = context.dictionaryEnabled();
if (compressionLevel != null) {
switch (codec) {
case GZIP:
config.put("zlib.compress.level", compressionLevel);
break;
case BROTLI:
config.put("compression.brotli.quality", compressionLevel);
break;
case ZSTD:
// keep "io.compression.codec.zstd.level" for backwards compatibility
config.put("io.compression.codec.zstd.level", compressionLevel);
config.put("parquet.compression.codec.zstd.level", compressionLevel);
break;
default:
// compression level is not supported; ignore it
}
}
set("parquet.avro.write-old-list-structure", "false");
MessageType type = ParquetSchemaUtil.convert(schema, name, variantShreddingFunc);
FileEncryptionProperties fileEncryptionProperties = null;
if (fileEncryptionKey != null) {
byte[] encryptionKeyArray = ByteBuffers.toByteArray(fileEncryptionKey);
byte[] aadPrefixArray = ByteBuffers.toByteArray(fileAADPrefix);
fileEncryptionProperties =
FileEncryptionProperties.builder(encryptionKeyArray)
.withAADPrefix(aadPrefixArray)
.withoutAADPrefixStorage()
.build();
} else {
Preconditions.checkState(fileAADPrefix == null, "AAD prefix set with null encryption key");
}
if (createWriterFunc != null) {
Preconditions.checkArgument(
writeSupport == null, "Cannot write with both write support and Parquet value writer");
for (Map.Entry<String, String> entry : config.entrySet()) {
conf.set(entry.getKey(), entry.getValue());
}
ParquetProperties.Builder propsBuilder =
ParquetProperties.builder()
.withWriterVersion(writerVersion)
.withPageSize(pageSize)
.withPageRowCountLimit(pageRowLimit)
.withDictionaryEncoding(dictionaryEnabled)
.withDictionaryPageSize(dictionaryPageSize)
.withMinRowCountForPageSizeCheck(rowGroupCheckMinRecordCount)
.withMaxRowCountForPageSizeCheck(rowGroupCheckMaxRecordCount)
.withMaxBloomFilterBytes(bloomFilterMaxBytes);
setBloomFilterConfig(
context, type, propsBuilder::withBloomFilterEnabled, propsBuilder::withBloomFilterFPP);
ParquetProperties parquetProperties = propsBuilder.build();
return new org.apache.iceberg.parquet.ParquetWriter<>(
conf,
file,
schema,
type,
rowGroupSize,
metadata,
createWriterFunc,
codec,
parquetProperties,
metricsConfig,
writeMode,
fileEncryptionProperties);
} else {
ParquetWriteBuilder<D> parquetWriteBuilder =
new ParquetWriteBuilder<D>(ParquetIO.file(file))
.withWriterVersion(writerVersion)
.setType(type)
.setConfig(config)
.setKeyValueMetadata(metadata)
.setWriteSupport(getWriteSupport(type))
.withCompressionCodec(codec)
.withWriteMode(writeMode)
.withRowGroupSize(rowGroupSize)
.withPageSize(pageSize)
.withPageRowCountLimit(pageRowLimit)
.withDictionaryEncoding(dictionaryEnabled)
.withDictionaryPageSize(dictionaryPageSize)
.withEncryption(fileEncryptionProperties);
setBloomFilterConfig(
context,
type,
parquetWriteBuilder::withBloomFilterEnabled,
parquetWriteBuilder::withBloomFilterFPP);
return new ParquetWriteAdapter<>(parquetWriteBuilder.build(), metricsConfig);
}
}