in java/java-dataflow-samples/read-pubsub-write-bigquery/src/main/java/com/cloudcode/dataflow/example/io/WriteRowToBigquery.java [44:94]
public WriteResult expand(PCollection<Row> input) {
// We implement this transform using key steps, namely write to BigQuery and collect any errors.
// Notice that the PCollection Row input contains the Beam Schema. This is a powerful reason
// why we convert to Beam Rows, allowing us to read from and write to numerous resources
// using a single data structure.
Schema schema = input.getSchema();
// Notice that we acquire the project and dataset from the datasetReference, derived from
// our PipelineOptions. However, we autogenerate the table name so that we create a unique
// BigQuery table in a later step.
TableReference tableReference =
new TableReference()
.setProjectId(datasetReference.getProjectId())
.setDatasetId(datasetReference.getDatasetId())
.setTableId(UUID.randomUUID().toString());
// We finally write to the BigQuery table.
org.apache.beam.sdk.io.gcp.bigquery.WriteResult result =
input.apply(
"Write To BigQuery",
BigQueryIO.<Row>write()
// This method delegates to Beam to automatically create the BigQuery table schema.
.useBeamSchema()
.withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.to(tableReference));
// We collect any failed insertion errors.
PCollection<String> errors =
result
.getFailedInserts()
.apply(
"getFailedInsertsWithErr",
MapElements.into(strings())
.via(failedTableRow -> requireNonNull(failedTableRow).toString()));
// We collect successful insertions.
PCollection<Row> success =
result
.getSuccessfulInserts()
.apply(
"successfulInserts",
MapElements.into(rows())
.via(tableRow -> BigQueryUtils.toBeamRow(schema, requireNonNull(tableRow))))
.setRowSchema(schema);
// We finally collect the errors and successful inserts into a WriteResult.
return WriteResult.of(input.getPipeline(), SUCCESS, success, ERROR, errors);
}