Pipeline setup()

in projects/dataflow-kafka-to-bigquery/pipeline/src/main/java/com/google/cloud/solutions/dataflow/kafka2bigquery/Kafka2BigQueryPipeline.java [67:123]


  Pipeline setup() throws IOException {
    options.setAppName("cloud-solutions/dataflow-kafka-to-bigquery-v1");
    options.setUserAgent("cloud-solutions/dataflow-kafka-to-bigquery-v1");

    var parseKafkaXform =
        ParseKafkaMessageTransform.builder()
            .clockFactory(clockFactory)
            .protoClassName(options.getProtoClassName())
            .protoJarPath(options.getProtoJarFile())
            .topic(options.getKafkaTopic())
            .build();

    var messagesTuple =
        pipeline
            .apply(
                "ReadFromKafka",
                KafkaIO.readBytes()
                    .withBootstrapServers(options.getKafkaBootstrapServers())
                    .withTopic(options.getKafkaTopic())
                    .withReadCommitted()
                    .withConsumerConfigUpdates(createKafkaConsumerConfig())
                    .withoutMetadata())
            .apply("ParseMessages", parseKafkaXform);

    // Write messages to main BQ Table
    messagesTuple
        .get(parseKafkaXform.getOutputTag())
        .apply(
            "WriteToBQ",
            bigqueryWriterFactory
                .get(TableRow.class)
                .to(options.getBigQueryTableSpec())
                .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(WriteDisposition.WRITE_APPEND));

    // Check and write unProcessedBytes messages deadletter BQ Table
    if (!isNullOrEmpty(options.getMismatchedMessageBigQueryTableSpec())) {
      var errorTableSchema =
          Resources.toString(
              Resources.getResource("bigquery_kafkaschemaerror_table_schema.json"),
              StandardCharsets.UTF_8);

      messagesTuple
          .get(parseKafkaXform.getSchemaErrorTag())
          .apply(
              "WriteErrorMessages",
              bigqueryWriterFactory
                  .get(KafkaSchemaError.class)
                  .to(options.getMismatchedMessageBigQueryTableSpec())
                  .withFormatFunction(KafkaSchemaError.bigqueryFormatFunction())
                  .withJsonSchema(errorTableSchema)
                  .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                  .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
    }

    return pipeline;
  }