in java/src/main/java/com/google/cloud/dataproc/templates/pubsub/PubSubToGCS.java [152:196]
public static void writeToGCS(
JavaDStream<SparkPubsubMessage> pubSubStream,
String gcsBucketName,
Integer batchSize,
String outputDataFormat,
Bucket bucket) {
pubSubStream.foreachRDD(
new VoidFunction<JavaRDD<SparkPubsubMessage>>() {
@Override
public void call(JavaRDD<SparkPubsubMessage> sparkPubsubMessageJavaRDD) throws Exception {
sparkPubsubMessageJavaRDD.foreachPartition(
new VoidFunction<Iterator<SparkPubsubMessage>>() {
@Override
public void call(Iterator<SparkPubsubMessage> sparkPubsubMessageIterator)
throws Exception {
if (outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_AVRO_EXTENSION)
|| outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_JSON_EXTENSION)) {
JSONArray jsonArr = new JSONArray();
while (sparkPubsubMessageIterator.hasNext()) {
SparkPubsubMessage message = sparkPubsubMessageIterator.next();
if (outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_AVRO_EXTENSION)) {
writeAvroToGCS(message, bucket);
} else if (outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_JSON_EXTENSION)) {
JSONObject record = new JSONObject(new String(message.getData()));
jsonArr.put(record);
if (jsonArr.length() == batchSize && jsonArr.length() > 0) {
writeJsonArrayToGCS(jsonArr, bucket);
jsonArr = new JSONArray();
}
}
} // end while
if (jsonArr.length() > 0
&& outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_JSON_EXTENSION)) {
// If any Json objects were missed including in the last batch
writeJsonArrayToGCS(jsonArr, bucket);
jsonArr = new JSONArray();
}
} else {
LOGGER.error(outputDataFormat + " is not supported...");
}
}
});
}
});
}