in projects/dataflow-kafka-to-bigquery/pipeline/src/main/java/com/google/cloud/solutions/dataflow/kafka2bigquery/Kafka2BigQueryPipeline.java [67:123]
Pipeline setup() throws IOException {
options.setAppName("cloud-solutions/dataflow-kafka-to-bigquery-v1");
options.setUserAgent("cloud-solutions/dataflow-kafka-to-bigquery-v1");
var parseKafkaXform =
ParseKafkaMessageTransform.builder()
.clockFactory(clockFactory)
.protoClassName(options.getProtoClassName())
.protoJarPath(options.getProtoJarFile())
.topic(options.getKafkaTopic())
.build();
var messagesTuple =
pipeline
.apply(
"ReadFromKafka",
KafkaIO.readBytes()
.withBootstrapServers(options.getKafkaBootstrapServers())
.withTopic(options.getKafkaTopic())
.withReadCommitted()
.withConsumerConfigUpdates(createKafkaConsumerConfig())
.withoutMetadata())
.apply("ParseMessages", parseKafkaXform);
// Write messages to main BQ Table
messagesTuple
.get(parseKafkaXform.getOutputTag())
.apply(
"WriteToBQ",
bigqueryWriterFactory
.get(TableRow.class)
.to(options.getBigQueryTableSpec())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
// Check and write unProcessedBytes messages deadletter BQ Table
if (!isNullOrEmpty(options.getMismatchedMessageBigQueryTableSpec())) {
var errorTableSchema =
Resources.toString(
Resources.getResource("bigquery_kafkaschemaerror_table_schema.json"),
StandardCharsets.UTF_8);
messagesTuple
.get(parseKafkaXform.getSchemaErrorTag())
.apply(
"WriteErrorMessages",
bigqueryWriterFactory
.get(KafkaSchemaError.class)
.to(options.getMismatchedMessageBigQueryTableSpec())
.withFormatFunction(KafkaSchemaError.bigqueryFormatFunction())
.withJsonSchema(errorTableSchema)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
}
return pipeline;
}