in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java [485:578]
public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec, Mode mode)
throws IOException, InterruptedException {
final WriteSupport<T> writeSupport = getWriteSupport(conf);
ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
.withPageSize(getPageSize(conf))
.withDictionaryPageSize(getDictionaryPageSize(conf))
.withDictionaryEncoding(getEnableDictionary(conf))
.withWriterVersion(getWriterVersion(conf))
.estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
.withPageValueCountThreshold(getValueCountThreshold(conf))
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
.withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
.withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
.withBloomFilterEnabled(getBloomFilterEnabled(conf))
.withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
.withStatisticsEnabled(getStatisticsEnabled(conf));
new ColumnConfigParser()
.withColumnConfig(
ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding)
.withColumnConfig(
BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, false), propsBuilder::withBloomFilterEnabled)
.withColumnConfig(
BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), propsBuilder::withBloomFilterNDV)
.withColumnConfig(
BLOOM_FILTER_FPP,
key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
propsBuilder::withBloomFilterFPP)
.withColumnConfig(
BLOOM_FILTER_CANDIDATES_NUMBER,
key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER),
propsBuilder::withBloomFilterCandidatesNumber)
.withColumnConfig(
STATISTICS_ENABLED,
key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED),
propsBuilder::withStatisticsEnabled)
.parseConfig(conf);
ParquetProperties props = propsBuilder.build();
long blockSize = getLongBlockSize(conf);
int maxPaddingSize = getMaxPaddingSize(conf);
boolean validating = getValidation(conf);
LOG.info(
"ParquetRecordWriter [block size: {}b, row group padding size: {}b, validating: {}]",
blockSize,
maxPaddingSize,
validating);
LOG.debug("Parquet properties are:\n{}", props);
WriteContext fileWriteContext = writeSupport.init(conf);
FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext);
ParquetFileWriter w = new ParquetFileWriter(
HadoopOutputFile.fromPath(file, conf),
fileWriteContext.getSchema(),
mode,
blockSize,
maxPaddingSize,
encryptionProperties,
props);
w.start();
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
long minAllocation =
conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION, MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
synchronized (ParquetOutputFormat.class) {
if (memoryManager == null) {
memoryManager = new MemoryManager(maxLoad, minAllocation);
}
}
if (memoryManager.getMemoryPoolRatio() != maxLoad) {
LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not "
+ "be reset by the new value: " + maxLoad);
}
return new ParquetRecordWriter<T>(
w,
writeSupport,
fileWriteContext.getSchema(),
fileWriteContext.getExtraMetaData(),
blockSize,
codec,
validating,
props,
memoryManager,
conf);
}