public WriteResult expand()

in java/java-dataflow-samples/read-pubsub-write-bigquery/src/main/java/com/cloudcode/dataflow/example/io/WriteRowToBigquery.java [44:94]


  public WriteResult expand(PCollection<Row> input) {
    // We implement this transform using key steps, namely write to BigQuery and collect any errors.

    // Notice that the PCollection Row input contains the Beam Schema.  This is a powerful reason
    // why we convert to Beam Rows, allowing us to read from and write to numerous resources
    // using a single data structure.
    Schema schema = input.getSchema();

    // Notice that we acquire the project and dataset from the datasetReference, derived from
    // our PipelineOptions.  However, we autogenerate the table name so that we create a unique
    // BigQuery table in a later step.
    TableReference tableReference =
        new TableReference()
            .setProjectId(datasetReference.getProjectId())
            .setDatasetId(datasetReference.getDatasetId())
            .setTableId(UUID.randomUUID().toString());

    // We finally write to the BigQuery table.
    org.apache.beam.sdk.io.gcp.bigquery.WriteResult result =
        input.apply(
            "Write To BigQuery",
            BigQueryIO.<Row>write()
                // This method delegates to Beam to automatically create the BigQuery table schema.
                .useBeamSchema()
                .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
                .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                .to(tableReference));

    // We collect any failed insertion errors.
    PCollection<String> errors =
        result
            .getFailedInserts()
            .apply(
                "getFailedInsertsWithErr",
                MapElements.into(strings())
                    .via(failedTableRow -> requireNonNull(failedTableRow).toString()));

    // We collect successful insertions.
    PCollection<Row> success =
        result
            .getSuccessfulInserts()
            .apply(
                "successfulInserts",
                MapElements.into(rows())
                    .via(tableRow -> BigQueryUtils.toBeamRow(schema, requireNonNull(tableRow))))
            .setRowSchema(schema);

    // We finally collect the errors and successful inserts into a WriteResult.
    return WriteResult.of(input.getPipeline(), SUCCESS, success, ERROR, errors);
  }