in java/src/main/java/com/google/cloud/dataproc/templates/kafka/KafkaToBQDstream.java [83:149]
public void runTemplate() throws TimeoutException, SQLException, InterruptedException {
SparkSession spark =
SparkSession.builder().appName("Kafka to BQ via Direct stream").getOrCreate();
spark.sparkContext().setLogLevel(sparkLogLevel);
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", kafkaBootstrapServers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", kafkaGroupId);
kafkaParams.put("auto.offset.reset", kafkaStartingOffsets);
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Collections.singletonList(kafkaTopic);
JavaSparkContext sparkContext = JavaSparkContext.fromSparkContext(spark.sparkContext());
JavaStreamingContext ssc = new JavaStreamingContext(sparkContext, new Duration(batchInterval));
JavaInputDStream<ConsumerRecord<Object, Object>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
stream.foreachRDD(
(VoidFunction2<JavaRDD<ConsumerRecord<Object, Object>>, Time>)
(rdd, time) -> {
LOGGER.debug("Reading kafka data");
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
JavaRDD<Tuple2<String, String>> recordRdd =
rdd.map(record -> new Tuple2(record.key(), record.value()));
Dataset<Row> rowDataset =
spark
.createDataset(
recordRdd.rdd(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.withColumnRenamed("_1", "key")
.withColumnRenamed("_2", "value");
if (!rowDataset.isEmpty()) {
LOGGER.info("Saving data into BQ");
rowDataset
.write()
.mode(bqWriteMode)
.format("bigquery")
.option(
KAFKA_BQ_SPARK_CONF_NAME_TABLE,
projectId + "." + bigQueryDataset + "." + bigQueryTable)
.option(KAFKA_BQ_SPARK_CONF_NAME_TEMP_GCS_BUCKET, tempGcsBucket)
.option(KAFKA_BQ_SPARK_CONF_NAME_OUTPUT_HEADER, true)
.save();
LOGGER.info("Saved to BQ");
}
((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);
});
ssc.start();
ssc.awaitTerminationOrTimeout(kafkaAwaitTerminationTimeout);
}