in src/main/java/com/google/solutions/df/log/aggregations/common/BQWriteTransform.java [72:107]
public WriteResult expand(PCollection<Row> row) {
switch (method()) {
case FILE_LOADS:
return row.apply(
BigQueryIO.<Row>write()
.to(tableSpec())
.useBeamSchema()
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withCustomGcsTempLocation(gcsTempLocation())
.withTriggeringFrequency(Duration.standardMinutes(batchFrequency()))
.withNumFileShards(Util.NUM_OF_SHARDS)
.ignoreInsertIds()
.withClustering(new Clustering().setFields(clusterFields()))
.withTimePartitioning(new TimePartitioning().setType(Util.DAY_PARTITION)));
case STREAMING_INSERTS:
return row.apply(
BigQueryIO.<Row>write()
.to(tableSpec())
.useBeamSchema()
.ignoreInsertIds()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
default:
return row.apply(
BigQueryIO.<Row>write()
.to(tableSpec())
.useBeamSchema()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
}
}