gobblin-modules/gobblin-parquet-apache/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java [42:116]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Slf4j
public class ParquetDataWriterBuilder<S,D> extends AbstractParquetDataWriterBuilder<S,D> {

  /**
   * Build a version-specific {@link ParquetWriter} for given {@link ParquetWriterConfiguration}
   * @param writerConfiguration
   * @return
   * @throws IOException
   */
  @Override
  public ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration)
      throws IOException {

    CompressionCodecName codecName = CompressionCodecName.fromConf(writerConfiguration.getCodecName());
    ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion
        .fromString(writerConfiguration.getWriterVersion());

    Configuration conf = new Configuration();
    ParquetWriter versionSpecificWriter = null;
    switch (writerConfiguration.getRecordFormat()) {
      case GROUP: {
        GroupWriteSupport.setSchema((MessageType) this.schema, conf);
        WriteSupport support = new GroupWriteSupport();
        versionSpecificWriter = new ParquetWriter<Group>(
            writerConfiguration.getAbsoluteStagingFile(),
            support,
            codecName,
            writerConfiguration.getBlockSize(),
            writerConfiguration.getPageSize(),
            writerConfiguration.getDictPageSize(),
            writerConfiguration.isDictionaryEnabled(),
            writerConfiguration.isValidate(),
            writerVersion,
            conf);
        break;
      }
      case AVRO:  {
        versionSpecificWriter = new AvroParquetWriter(
            writerConfiguration.getAbsoluteStagingFile(),
            (Schema) this.schema,
            codecName,
            writerConfiguration.getBlockSize(),
            writerConfiguration.getPageSize(),
            writerConfiguration.isDictionaryEnabled(),
            conf);
        break;
      }
      case PROTOBUF: {
        versionSpecificWriter = new ProtoParquetWriter(
            writerConfiguration.getAbsoluteStagingFile(),
            (Class<? extends Message>) this.schema,
            codecName,
            writerConfiguration.getBlockSize(),
            writerConfiguration.getPageSize(),
            writerConfiguration.isDictionaryEnabled(),
            writerConfiguration.isValidate());
        break;
      }
      default: throw new RuntimeException("Record format not supported");
    }
    ParquetWriter finalVersionSpecificWriter = versionSpecificWriter;

    return new ParquetWriterShim() {
      @Override
      public void write(Object record)
          throws IOException {
        finalVersionSpecificWriter.write(record);
      }

      @Override
      public void close()
          throws IOException {
        finalVersionSpecificWriter.close();
      }
    };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/writer/ParquetDataWriterBuilder.java [41:115]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@Slf4j
public class ParquetDataWriterBuilder<S,D> extends AbstractParquetDataWriterBuilder<S,D> {

  /**
   * Build a version-specific {@link ParquetWriter} for given {@link ParquetWriterConfiguration}
   * @param writerConfiguration
   * @return
   * @throws IOException
   */
  @Override
  public ParquetWriterShim getVersionSpecificWriter(ParquetWriterConfiguration writerConfiguration)
      throws IOException {

    CompressionCodecName codecName = CompressionCodecName.fromConf(writerConfiguration.getCodecName());
    ParquetProperties.WriterVersion writerVersion = ParquetProperties.WriterVersion
        .fromString(writerConfiguration.getWriterVersion());

    Configuration conf = new Configuration();
    ParquetWriter versionSpecificWriter = null;
    switch (writerConfiguration.getRecordFormat()) {
      case GROUP: {
        GroupWriteSupport.setSchema((MessageType) this.schema, conf);
        WriteSupport support = new GroupWriteSupport();
        versionSpecificWriter = new ParquetWriter<Group>(
            writerConfiguration.getAbsoluteStagingFile(),
            support,
            codecName,
            writerConfiguration.getBlockSize(),
            writerConfiguration.getPageSize(),
            writerConfiguration.getDictPageSize(),
            writerConfiguration.isDictionaryEnabled(),
            writerConfiguration.isValidate(),
            writerVersion,
            conf);
        break;
      }
      case AVRO:  {
        versionSpecificWriter = new AvroParquetWriter(
            writerConfiguration.getAbsoluteStagingFile(),
            (Schema) this.schema,
            codecName,
            writerConfiguration.getBlockSize(),
            writerConfiguration.getPageSize(),
            writerConfiguration.isDictionaryEnabled(),
            conf);
        break;
      }
      case PROTOBUF: {
        versionSpecificWriter = new ProtoParquetWriter(
            writerConfiguration.getAbsoluteStagingFile(),
            (Class<? extends Message>) this.schema,
            codecName,
            writerConfiguration.getBlockSize(),
            writerConfiguration.getPageSize(),
            writerConfiguration.isDictionaryEnabled(),
            writerConfiguration.isValidate());
        break;
      }
      default: throw new RuntimeException("Record format not supported");
    }
    ParquetWriter finalVersionSpecificWriter = versionSpecificWriter;

    return new ParquetWriterShim() {
      @Override
      public void write(Object record)
          throws IOException {
        finalVersionSpecificWriter.write(record);
      }

      @Override
      public void close()
          throws IOException {
        finalVersionSpecificWriter.close();
      }
    };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



