public static void writeToBQ()

in java/src/main/java/com/google/cloud/dataproc/templates/pubsub/PubSubToBQ.java [111:171]


  public static void writeToBQ(
      JavaDStream<SparkPubsubMessage> pubSubStream,
      String outputProjectID,
      String pubSubBQOutputDataset,
      String PubSubBQOutputTable,
      Integer batchSize) {
    pubSubStream.foreachRDD(
        new VoidFunction<JavaRDD<SparkPubsubMessage>>() {
          @Override
          public void call(JavaRDD<SparkPubsubMessage> sparkPubsubMessageJavaRDD) throws Exception {
            sparkPubsubMessageJavaRDD.foreachPartition(
                new VoidFunction<Iterator<SparkPubsubMessage>>() {
                  @Override
                  public void call(Iterator<SparkPubsubMessage> sparkPubsubMessageIterator)
                      throws Exception {

                    BigQueryWriteClient bqClient = BigQueryWriteClient.create();
                    WriteStream stream =
                        WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
                    TableName tableName =
                        TableName.of(outputProjectID, pubSubBQOutputDataset, PubSubBQOutputTable);
                    CreateWriteStreamRequest createWriteStreamRequest =
                        CreateWriteStreamRequest.newBuilder()
                            .setParent(tableName.toString())
                            .setWriteStream(stream)
                            .build();
                    WriteStream writeStream = bqClient.createWriteStream(createWriteStreamRequest);

                    try (JsonStreamWriter writer =
                        JsonStreamWriter.newBuilder(
                                writeStream.getName(), writeStream.getTableSchema())
                            .build()) {

                      JSONArray jsonArr = new JSONArray();
                      while (sparkPubsubMessageIterator.hasNext()) {
                        SparkPubsubMessage message = sparkPubsubMessageIterator.next();
                        JSONObject record = new JSONObject(new String(message.getData()));
                        jsonArr.put(record);
                        if (jsonArr.length() == batchSize) {
                          ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
                          AppendRowsResponse response = future.get();
                          jsonArr = new JSONArray();
                        }
                      }
                      if (jsonArr.length() > 0) {
                        ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
                        AppendRowsResponse response = future.get();
                      }

                      // Finalize the stream after use.
                      FinalizeWriteStreamRequest finalizeWriteStreamRequest =
                          FinalizeWriteStreamRequest.newBuilder()
                              .setName(writeStream.getName())
                              .build();
                      bqClient.finalizeWriteStream(finalizeWriteStreamRequest);
                    }
                  }
                });
          }
        });
  }