private static void storeInBigQuery()

in order-book-pipeline/src/main/java/com/google/cloud/dataflow/orderbook/OrderBookProcessingPipeline.java [131:147]


  private static <Input> void storeInBigQuery(PCollection<Input> input, String tableName,
      String shortTableDescription, SerializableFunction<Input, TableRow> formatFunction) {
    WriteResult processingStatusWriteResult = input.apply(shortTableDescription + " to BQ",
        BigQueryIO.<Input>write()
            .to(tableName)
            .withFormatFunction(formatFunction)
            .withMethod(Method.STORAGE_WRITE_API)
            .withAutoSharding()
            .withTriggeringFrequency(Duration.standardSeconds(2))
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
    );
    processingStatusWriteResult.getFailedStorageApiInserts()
        .apply("DLQ for " + shortTableDescription,
            new FailedBigQueryInsertProcessor(tableName));
  }