public static PipelineResult runAvroPipeline()

in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java [465:603]


  public static PipelineResult runAvroPipeline(
      Pipeline pipeline,
      KafkaToBQOptions options,
      List<String> topicsList,
      String bootstrapServers,
      Map<String, Object> kafkaConfig) {

    String avroSchema = options.getAvroSchemaPath();
    Schema schema = SchemaUtils.getAvroSchema(avroSchema);
    PCollectionTuple convertedTableRows;
    PCollection<KV<byte[], GenericRecord>> genericRecords;

    genericRecords =
        pipeline
            /*
             * Step #1: Read messages in from Kafka
             */
            .apply(
                "ReadFromKafka",
                KafkaTransform.readAvroFromKafka(
                    bootstrapServers, topicsList, kafkaConfig, avroSchema, false))
            .setCoder(
                KvCoder.of(
                    ByteArrayCoder.of(),
                    SchemaCoder.of(
                        AvroUtils.toBeamSchema(schema),
                        TypeDescriptor.of(GenericRecord.class),
                        AvroUtils.getToRowFunction(GenericRecord.class, schema),
                        AvroUtils.getFromRowFunction(GenericRecord.class))));

    /*
     * Step #2: Transform the Kafka Messages into TableRows
     */

    WriteResult writeResult;
    // if it has UDF, convert from GenericAvro to JSON and apply UDF
    if (StringUtils.isNotEmpty(options.getJavascriptTextTransformGcsPath())
        && StringUtils.isNotEmpty(options.getJavascriptTextTransformFunctionName())) {
      convertedTableRows =
          new StringMessageToTableRow(options)
              .expand(
                  genericRecords.apply(
                      "ConvertToJson",
                      MapElements.into(
                              TypeDescriptors.kvs(
                                  TypeDescriptors.strings(), TypeDescriptors.strings()))
                          .via(
                              kv ->
                                  KV.of(
                                      new String(kv.getKey(), StandardCharsets.UTF_8),
                                      Objects.requireNonNull(kv.getValue()).toString()))));
      writeResult =
          convertedTableRows
              .get(TRANSFORM_OUT)
              .apply(
                  "WriteSuccessfulRecords",
                  BigQueryIO.writeTableRows()
                      .withoutValidation()
                      .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                      .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                      .withExtendedErrorInfo()
                      .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                      .to(options.getOutputTableSpec()));

      /*
       * Step 3 Contd.
       * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
       */
      PCollection<FailsafeElement<String, String>> failedInserts =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
              .apply(
                  "WrapInsertionErrors",
                  MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                      .via(KafkaToBigQuery::wrapBigQueryInsertError))
              .setCoder(FAILSAFE_ELEMENT_CODER);

      /*
       * Step #4: Write failed records out to BigQuery
       */
      PCollectionList.of(convertedTableRows.get(UDF_DEADLETTER_OUT))
          .and(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
          .apply("Flatten", Flatten.pCollections())
          .apply(
              "WriteTransformationFailedRecords",
              WriteStringKVMessageErrors.newBuilder()
                  .setErrorRecordsTable(
                      ObjectUtils.firstNonNull(
                          options.getOutputDeadletterTable(),
                          options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
                  .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
                  .build());

      /*
       * Step #5: Insert records that failed BigQuery inserts into a deadletter table.
       */
      failedInserts.apply(
          "WriteInsertionFailedRecords",
          ErrorConverters.WriteStringMessageErrors.newBuilder()
              .setErrorRecordsTable(
                  ObjectUtils.firstNonNull(
                      options.getOutputDeadletterTable(),
                      options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
              .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
              .build());
    } else {
      writeResult =
          genericRecords
              .apply(Values.create())
              .apply(Convert.toRows())
              .apply(BigQueryConverters.<Row>createWriteTransform(options).useBeamSchema());

      /*
       * Step 3 Contd.
       * Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
       */
      PCollection<FailsafeElement<String, String>> failedInserts =
          BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
              .apply(
                  "WrapInsertionErrors",
                  MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                      .via(KafkaToBigQuery::wrapBigQueryInsertError))
              .setCoder(FAILSAFE_ELEMENT_CODER);

      /*
       * Step #5: Insert records that failed BigQuery inserts into a deadletter table.
       */
      failedInserts.apply(
          "WriteInsertionFailedRecords",
          ErrorConverters.WriteStringMessageErrors.newBuilder()
              .setErrorRecordsTable(
                  ObjectUtils.firstNonNull(
                      options.getOutputDeadletterTable(),
                      options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
              .setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
              .build());
    }

    return pipeline.run();
  }