public WriteResult expand()

in src/main/java/com/google/solutions/df/log/aggregations/common/fraud/detection/BQWriteTransform.java [65:99]


  public WriteResult expand(PCollection<Row> row) {

    switch (method()) {
      case FILE_LOADS:
        return row.apply(
            BigQueryIO.<Row>write()
                .to(tableSpec())
                .useBeamSchema()
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withCustomGcsTempLocation(gcsTempLocation())
                .withTriggeringFrequency(Duration.standardMinutes(batchFrequency()))
                .withNumFileShards(NUM_OF_SHARDS));

      case STREAMING_INSERTS:
        return row.apply(
            BigQueryIO.<Row>write()
                .to(tableSpec())
                .useBeamSchema()
                .ignoreInsertIds()
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry()));

      default:
        return row.apply(
            BigQueryIO.<Row>write()
                .to(tableSpec())
                .useBeamSchema()
                .ignoreInsertIds()
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
    }
  }