public static WriteResult processKafkaRecords()

in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java [154:204]


  public static WriteResult processKafkaRecords(
      PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {

    // Validate the pipeline options for MessageFormat and SchemaFormat.
    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (options.getBinaryAvroSchemaPath() != null
            && options.getBinaryAvroSchemaPath().isBlank())) {
      throw new IllegalArgumentException(
          "Binary Avro Schema Path cannot be empty for AVRO_BINARY_ENCODING.");
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)) {

      if ((options.getSchemaRegistryConnectionUrl() != null
              && options.getSchemaRegistryConnectionUrl().isBlank())
          && (options.getConfluentAvroSchemaPath() != null
              && options.getConfluentAvroSchemaPath().isBlank())) {
        throw new IllegalArgumentException(
            "Either Schema Registry Connection URL or Confluent Avro Schema Path must be provided for AVRO_CONFLUENT_WIRE_FORMAT.");
      }

      if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
        if (!options.getConfluentAvroSchemaPath().isBlank()
            && (options.getOutputTableSpec() != null && options.getOutputTableSpec().isBlank())) {
          throw new IllegalArgumentException(
              "The outputTableSpec parameter is required when using the SINGLE_SCHEMA_FILE schema format.");
        }
      } else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
        if (options.getSchemaRegistryConnectionUrl() != null
            && (options.getOutputDataset() != null && options.getOutputDataset().isBlank())) {
          throw new IllegalArgumentException(
              "An output BigQuery dataset is required. It will be used to create tables per schema.");
        }
      } else {
        throw new IllegalArgumentException(
            "Unsupported schemaFormat parameter value: " + options.getSchemaFormat());
      }
    }

    if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
        && (!options.getBinaryAvroSchemaPath().isBlank())) {
      return handleAvroBinaryEncoding(kafkaRecords, options);
    } else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
        && (!options.getSchemaRegistryConnectionUrl().isBlank()
            || !options.getConfluentAvroSchemaPath().isBlank())) {
      return handleAvroConfluentWireFormat(kafkaRecords, options);
    } else {
      throw new IllegalArgumentException(
          "Message format " + options.getMessageFormat() + " is unsupported.");
    }
  }