public PipelineResult run()

in src/main/java/com/google/cloud/solutions/autotokenize/pipeline/EncryptionPipeline.java [120:150]


  public PipelineResult run() throws Exception {
    var encryptedSchema = buildEncryptedSchema();
    var encryptedRecords =
        applyReadAndEncryptionSteps().apply(RecordNester.forSchema(encryptedSchema));

    if (options.getOutputDirectory() != null) {
      encryptedRecords.apply(
          "WriteAVRO",
          AvroIO.writeGenericRecords(encryptedSchema)
              .withSuffix(".avro")
              .to(cleanDirectoryString(options.getOutputDirectory()) + "/data")
              .withCodec(CodecFactory.snappyCodec()));
    }

    if (options.getOutputBigQueryTable() != null) {

      encryptedRecords
          .apply("Reshuffle", Reshuffle.viaRandomKey())
          .setCoder(AvroUtils.schemaCoder(GenericRecord.class, encryptedSchema))
          .apply(
              "WriteToBigQuery",
              BigQueryIO.<GenericRecord>write()
                  .to(options.getOutputBigQueryTable())
                  .useBeamSchema()
                  .optimizedWrites()
                  .withWriteDisposition(
                      (options.getBigQueryAppend()) ? WRITE_APPEND : WRITE_TRUNCATE));
    }

    return pipeline.run();
  }