public static void writeToGCS()

in java/src/main/java/com/google/cloud/dataproc/templates/pubsub/PubSubToGCS.java [152:196]


  public static void writeToGCS(
      JavaDStream<SparkPubsubMessage> pubSubStream,
      String gcsBucketName,
      Integer batchSize,
      String outputDataFormat,
      Bucket bucket) {
    pubSubStream.foreachRDD(
        new VoidFunction<JavaRDD<SparkPubsubMessage>>() {
          @Override
          public void call(JavaRDD<SparkPubsubMessage> sparkPubsubMessageJavaRDD) throws Exception {
            sparkPubsubMessageJavaRDD.foreachPartition(
                new VoidFunction<Iterator<SparkPubsubMessage>>() {
                  @Override
                  public void call(Iterator<SparkPubsubMessage> sparkPubsubMessageIterator)
                      throws Exception {
                    if (outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_AVRO_EXTENSION)
                        || outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_JSON_EXTENSION)) {
                      JSONArray jsonArr = new JSONArray();
                      while (sparkPubsubMessageIterator.hasNext()) {
                        SparkPubsubMessage message = sparkPubsubMessageIterator.next();
                        if (outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_AVRO_EXTENSION)) {
                          writeAvroToGCS(message, bucket);
                        } else if (outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_JSON_EXTENSION)) {
                          JSONObject record = new JSONObject(new String(message.getData()));
                          jsonArr.put(record);
                          if (jsonArr.length() == batchSize && jsonArr.length() > 0) {
                            writeJsonArrayToGCS(jsonArr, bucket);
                            jsonArr = new JSONArray();
                          }
                        }
                      } // end while
                      if (jsonArr.length() > 0
                          && outputDataFormat.equalsIgnoreCase(PUBSUB_GCS_JSON_EXTENSION)) {
                        // If any Json objects were missed including in the last batch
                        writeJsonArrayToGCS(jsonArr, bucket);
                        jsonArr = new JSONArray();
                      }
                    } else {
                      LOGGER.error(outputDataFormat + " is not supported...");
                    }
                  }
                });
          }
        });
  }