public WriteResult expand()

in src/main/java/com/google/solutions/df/log/aggregations/common/BQWriteTransform.java [72:107]


  public WriteResult expand(PCollection<Row> row) {

    switch (method()) {
      case FILE_LOADS:
        return row.apply(
            BigQueryIO.<Row>write()
                .to(tableSpec())
                .useBeamSchema()
                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withCustomGcsTempLocation(gcsTempLocation())
                .withTriggeringFrequency(Duration.standardMinutes(batchFrequency()))
                .withNumFileShards(Util.NUM_OF_SHARDS)
                .ignoreInsertIds()
                .withClustering(new Clustering().setFields(clusterFields()))
                .withTimePartitioning(new TimePartitioning().setType(Util.DAY_PARTITION)));

      case STREAMING_INSERTS:
        return row.apply(
            BigQueryIO.<Row>write()
                .to(tableSpec())
                .useBeamSchema()
                .ignoreInsertIds()
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));

      default:
        return row.apply(
            BigQueryIO.<Row>write()
                .to(tableSpec())
                .useBeamSchema()
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER));
    }
  }