in src/main/java/com/google/cloud/solutions/autotokenize/pipeline/EncryptionPipeline.java [120:150]
public PipelineResult run() throws Exception {
var encryptedSchema = buildEncryptedSchema();
var encryptedRecords =
applyReadAndEncryptionSteps().apply(RecordNester.forSchema(encryptedSchema));
if (options.getOutputDirectory() != null) {
encryptedRecords.apply(
"WriteAVRO",
AvroIO.writeGenericRecords(encryptedSchema)
.withSuffix(".avro")
.to(cleanDirectoryString(options.getOutputDirectory()) + "/data")
.withCodec(CodecFactory.snappyCodec()));
}
if (options.getOutputBigQueryTable() != null) {
encryptedRecords
.apply("Reshuffle", Reshuffle.viaRandomKey())
.setCoder(AvroUtils.schemaCoder(GenericRecord.class, encryptedSchema))
.apply(
"WriteToBigQuery",
BigQueryIO.<GenericRecord>write()
.to(options.getOutputBigQueryTable())
.useBeamSchema()
.optimizedWrites()
.withWriteDisposition(
(options.getBigQueryAppend()) ? WRITE_APPEND : WRITE_TRUNCATE));
}
return pipeline.run();
}