public RecordWriter getRecordWriter()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java [447:524]


  public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec, Mode mode)
        throws IOException, InterruptedException {
    final WriteSupport<T> writeSupport = getWriteSupport(conf);

    ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
        .withPageSize(getPageSize(conf))
        .withDictionaryPageSize(getDictionaryPageSize(conf))
        .withDictionaryEncoding(getEnableDictionary(conf))
        .withWriterVersion(getWriterVersion(conf))
        .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
        .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
        .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
        .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
        .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
        .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
        .withBloomFilterEnabled(getBloomFilterEnabled(conf))
        .withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
        .withPageRowCountLimit(getPageRowCountLimit(conf))
        .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf));
    new ColumnConfigParser()
        .withColumnConfig(ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding)
        .withColumnConfig(BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, false),
            propsBuilder::withBloomFilterEnabled)
        .withColumnConfig(BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), propsBuilder::withBloomFilterNDV)
        .withColumnConfig(BLOOM_FILTER_FPP, key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
            propsBuilder::withBloomFilterFPP)
        .withColumnConfig(
          BLOOM_FILTER_CANDIDATES_NUMBER,
          key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER),
          propsBuilder::withBloomFilterCandidatesNumber)
        .parseConfig(conf);

    ParquetProperties props = propsBuilder.build();

    long blockSize = getLongBlockSize(conf);
    int maxPaddingSize = getMaxPaddingSize(conf);
    boolean validating = getValidation(conf);

    LOG.info(
        "ParquetRecordWriter [block size: {}b, row group padding size: {}b, validating: {}]",
        blockSize, maxPaddingSize, validating);
    LOG.debug("Parquet properties are:\n{}", props);

    WriteContext fileWriteContext = writeSupport.init(conf);

    FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext);

    ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
        fileWriteContext.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
        props.getStatisticsTruncateLength(), props.getPageWriteChecksumEnabled(), encryptionProperties);
    w.start();

    float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
        MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
    long minAllocation = conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION,
        MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
    synchronized (ParquetOutputFormat.class) {
      if (memoryManager == null) {
        memoryManager = new MemoryManager(maxLoad, minAllocation);
      }
    }
    if (memoryManager.getMemoryPoolRatio() != maxLoad) {
      LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not " +
          "be reset by the new value: " + maxLoad);
    }

    return new ParquetRecordWriter<T>(
        w,
        writeSupport,
        fileWriteContext.getSchema(),
        fileWriteContext.getExtraMetaData(),
        blockSize,
        codec,
        validating,
        props,
        memoryManager,
        conf);
  }