in order-book-pipeline/src/main/java/com/google/cloud/dataflow/orderbook/OrderBookProcessingPipeline.java [131:147]
private static <Input> void storeInBigQuery(PCollection<Input> input, String tableName,
String shortTableDescription, SerializableFunction<Input, TableRow> formatFunction) {
WriteResult processingStatusWriteResult = input.apply(shortTableDescription + " to BQ",
BigQueryIO.<Input>write()
.to(tableName)
.withFormatFunction(formatFunction)
.withMethod(Method.STORAGE_WRITE_API)
.withAutoSharding()
.withTriggeringFrequency(Duration.standardSeconds(2))
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withDefaultMissingValueInterpretation(MissingValueInterpretation.DEFAULT_VALUE)
);
processingStatusWriteResult.getFailedStorageApiInserts()
.apply("DLQ for " + shortTableDescription,
new FailedBigQueryInsertProcessor(tableName));
}