public RecordWriter getRecordWriter()

in parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java [485:578]


  public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, CompressionCodecName codec, Mode mode)
      throws IOException, InterruptedException {
    final WriteSupport<T> writeSupport = getWriteSupport(conf);

    ParquetProperties.Builder propsBuilder = ParquetProperties.builder()
        .withPageSize(getPageSize(conf))
        .withDictionaryPageSize(getDictionaryPageSize(conf))
        .withDictionaryEncoding(getEnableDictionary(conf))
        .withWriterVersion(getWriterVersion(conf))
        .estimateRowCountForPageSizeCheck(getEstimatePageSizeCheck(conf))
        .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
        .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
        .withPageValueCountThreshold(getValueCountThreshold(conf))
        .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
        .withStatisticsTruncateLength(getStatisticsTruncateLength(conf))
        .withMaxBloomFilterBytes(getBloomFilterMaxBytes(conf))
        .withBloomFilterEnabled(getBloomFilterEnabled(conf))
        .withAdaptiveBloomFilterEnabled(getAdaptiveBloomFilterEnabled(conf))
        .withPageRowCountLimit(getPageRowCountLimit(conf))
        .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
        .withStatisticsEnabled(getStatisticsEnabled(conf));
    new ColumnConfigParser()
        .withColumnConfig(
            ENABLE_DICTIONARY, key -> conf.getBoolean(key, false), propsBuilder::withDictionaryEncoding)
        .withColumnConfig(
            BLOOM_FILTER_ENABLED, key -> conf.getBoolean(key, false), propsBuilder::withBloomFilterEnabled)
        .withColumnConfig(
            BLOOM_FILTER_EXPECTED_NDV, key -> conf.getLong(key, -1L), propsBuilder::withBloomFilterNDV)
        .withColumnConfig(
            BLOOM_FILTER_FPP,
            key -> conf.getDouble(key, ParquetProperties.DEFAULT_BLOOM_FILTER_FPP),
            propsBuilder::withBloomFilterFPP)
        .withColumnConfig(
            BLOOM_FILTER_CANDIDATES_NUMBER,
            key -> conf.getInt(key, ParquetProperties.DEFAULT_BLOOM_FILTER_CANDIDATES_NUMBER),
            propsBuilder::withBloomFilterCandidatesNumber)
        .withColumnConfig(
            STATISTICS_ENABLED,
            key -> conf.getBoolean(key, ParquetProperties.DEFAULT_STATISTICS_ENABLED),
            propsBuilder::withStatisticsEnabled)
        .parseConfig(conf);

    ParquetProperties props = propsBuilder.build();

    long blockSize = getLongBlockSize(conf);
    int maxPaddingSize = getMaxPaddingSize(conf);
    boolean validating = getValidation(conf);

    LOG.info(
        "ParquetRecordWriter [block size: {}b, row group padding size: {}b, validating: {}]",
        blockSize,
        maxPaddingSize,
        validating);
    LOG.debug("Parquet properties are:\n{}", props);

    WriteContext fileWriteContext = writeSupport.init(conf);

    FileEncryptionProperties encryptionProperties = createEncryptionProperties(conf, file, fileWriteContext);

    ParquetFileWriter w = new ParquetFileWriter(
        HadoopOutputFile.fromPath(file, conf),
        fileWriteContext.getSchema(),
        mode,
        blockSize,
        maxPaddingSize,
        encryptionProperties,
        props);
    w.start();

    float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO, MemoryManager.DEFAULT_MEMORY_POOL_RATIO);
    long minAllocation =
        conf.getLong(ParquetOutputFormat.MIN_MEMORY_ALLOCATION, MemoryManager.DEFAULT_MIN_MEMORY_ALLOCATION);
    synchronized (ParquetOutputFormat.class) {
      if (memoryManager == null) {
        memoryManager = new MemoryManager(maxLoad, minAllocation);
      }
    }
    if (memoryManager.getMemoryPoolRatio() != maxLoad) {
      LOG.warn("The configuration " + MEMORY_POOL_RATIO + " has been set. It should not "
          + "be reset by the new value: " + maxLoad);
    }

    return new ParquetRecordWriter<T>(
        w,
        writeSupport,
        fileWriteContext.getSchema(),
        fileWriteContext.getExtraMetaData(),
        blockSize,
        codec,
        validating,
        props,
        memoryManager,
        conf);
  }