in java/src/main/java/com/google/cloud/dataproc/templates/pubsub/PubSubToBQ.java [111:171]
public static void writeToBQ(
JavaDStream<SparkPubsubMessage> pubSubStream,
String outputProjectID,
String pubSubBQOutputDataset,
String PubSubBQOutputTable,
Integer batchSize) {
pubSubStream.foreachRDD(
new VoidFunction<JavaRDD<SparkPubsubMessage>>() {
@Override
public void call(JavaRDD<SparkPubsubMessage> sparkPubsubMessageJavaRDD) throws Exception {
sparkPubsubMessageJavaRDD.foreachPartition(
new VoidFunction<Iterator<SparkPubsubMessage>>() {
@Override
public void call(Iterator<SparkPubsubMessage> sparkPubsubMessageIterator)
throws Exception {
BigQueryWriteClient bqClient = BigQueryWriteClient.create();
WriteStream stream =
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
TableName tableName =
TableName.of(outputProjectID, pubSubBQOutputDataset, PubSubBQOutputTable);
CreateWriteStreamRequest createWriteStreamRequest =
CreateWriteStreamRequest.newBuilder()
.setParent(tableName.toString())
.setWriteStream(stream)
.build();
WriteStream writeStream = bqClient.createWriteStream(createWriteStreamRequest);
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(
writeStream.getName(), writeStream.getTableSchema())
.build()) {
JSONArray jsonArr = new JSONArray();
while (sparkPubsubMessageIterator.hasNext()) {
SparkPubsubMessage message = sparkPubsubMessageIterator.next();
JSONObject record = new JSONObject(new String(message.getData()));
jsonArr.put(record);
if (jsonArr.length() == batchSize) {
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();
jsonArr = new JSONArray();
}
}
if (jsonArr.length() > 0) {
ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();
}
// Finalize the stream after use.
FinalizeWriteStreamRequest finalizeWriteStreamRequest =
FinalizeWriteStreamRequest.newBuilder()
.setName(writeStream.getName())
.build();
bqClient.finalizeWriteStream(finalizeWriteStreamRequest);
}
}
});
}
});
}