in src/main/java/com/google/solutions/df/log/aggregations/common/fraud/detection/BQWriteTransform.java [65:99]
public WriteResult expand(PCollection<Row> row) {
switch (method()) {
case FILE_LOADS:
return row.apply(
BigQueryIO.<Row>write()
.to(tableSpec())
.useBeamSchema()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withCustomGcsTempLocation(gcsTempLocation())
.withTriggeringFrequency(Duration.standardMinutes(batchFrequency()))
.withNumFileShards(NUM_OF_SHARDS));
case STREAMING_INSERTS:
return row.apply(
BigQueryIO.<Row>write()
.to(tableSpec())
.useBeamSchema()
.ignoreInsertIds()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry()));
default:
return row.apply(
BigQueryIO.<Row>write()
.to(tableSpec())
.useBeamSchema()
.ignoreInsertIds()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
}
}