public void runTemplate()

in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToBQDstream.java [83:149]


  public void runTemplate() throws TimeoutException, SQLException, InterruptedException {

    SparkSession spark =
        SparkSession.builder().appName("Kafka to BQ via Direct stream").getOrCreate();

    spark.sparkContext().setLogLevel(sparkLogLevel);

    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", kafkaBootstrapServers);
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", kafkaGroupId);
    kafkaParams.put("auto.offset.reset", kafkaStartingOffsets);
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Collections.singletonList(kafkaTopic);

    JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());

    JavaStreamingContext ssc = new JavaStreamingContext(sparkContext, new Duration(batchInterval));

    JavaInputDStream<ConsumerRecord<Object, Object>> stream =
        KafkaUtils.createDirectStream(
            ssc,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.Subscribe(topics, kafkaParams));

    stream.foreachRDD(
        (VoidFunction2<JavaRDD<ConsumerRecord<Object, Object>>, Time>)
            (rdd, time) -> {
              LOGGER.debug("Reading kafka data");

              OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

              JavaRDD<Tuple2<String, String>> recordRdd =
                  rdd.map(record -> new Tuple2(record.key(), record.value()));

              Dataset<Row> rowDataset =
                  spark
                      .createDataset(
                          recordRdd.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
                      .withColumnRenamed("_1", "key")
                      .withColumnRenamed("_2", "value");

              if (!rowDataset.isEmpty()) {

                LOGGER.info("Saving data into BQ");

                rowDataset
                    .write()
                    .mode(bqWriteMode)
                    .format("bigquery")
                    .option(
                        KAFKA_BQ_SPARK_CONF_NAME_TABLE,
                        projectId + "." + bigQueryDataset + "." + bigQueryTable)
                    .option(KAFKA_BQ_SPARK_CONF_NAME_TEMP_GCS_BUCKET, tempGcsBucket)
                    .option(KAFKA_BQ_SPARK_CONF_NAME_OUTPUT_HEADER, true)
                    .save();

                LOGGER.info("Saved to BQ");
              }
              ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
            });

    ssc.start();
    ssc.awaitTerminationOrTimeout(kafkaAwaitTerminationTimeout);
  }