in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQueryFlex.java [154:204]
public static WriteResult processKafkaRecords(
PCollection<KafkaRecord<byte[], byte[]>> kafkaRecords, KafkaToBigQueryFlexOptions options) {
// Validate the pipeline options for MessageFormat and SchemaFormat.
if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
&& (options.getBinaryAvroSchemaPath() != null
&& options.getBinaryAvroSchemaPath().isBlank())) {
throw new IllegalArgumentException(
"Binary Avro Schema Path cannot be empty for AVRO_BINARY_ENCODING.");
}
if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)) {
if ((options.getSchemaRegistryConnectionUrl() != null
&& options.getSchemaRegistryConnectionUrl().isBlank())
&& (options.getConfluentAvroSchemaPath() != null
&& options.getConfluentAvroSchemaPath().isBlank())) {
throw new IllegalArgumentException(
"Either Schema Registry Connection URL or Confluent Avro Schema Path must be provided for AVRO_CONFLUENT_WIRE_FORMAT.");
}
if (options.getSchemaFormat().equals(SchemaFormat.SINGLE_SCHEMA_FILE)) {
if (!options.getConfluentAvroSchemaPath().isBlank()
&& (options.getOutputTableSpec() != null && options.getOutputTableSpec().isBlank())) {
throw new IllegalArgumentException(
"The outputTableSpec parameter is required when using the SINGLE_SCHEMA_FILE schema format.");
}
} else if (options.getSchemaFormat().equals(SchemaFormat.SCHEMA_REGISTRY)) {
if (options.getSchemaRegistryConnectionUrl() != null
&& (options.getOutputDataset() != null && options.getOutputDataset().isBlank())) {
throw new IllegalArgumentException(
"An output BigQuery dataset is required. It will be used to create tables per schema.");
}
} else {
throw new IllegalArgumentException(
"Unsupported schemaFormat parameter value: " + options.getSchemaFormat());
}
}
if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_BINARY_ENCODING)
&& (!options.getBinaryAvroSchemaPath().isBlank())) {
return handleAvroBinaryEncoding(kafkaRecords, options);
} else if (options.getMessageFormat().equals(MessageFormatConstants.AVRO_CONFLUENT_WIRE_FORMAT)
&& (!options.getSchemaRegistryConnectionUrl().isBlank()
|| !options.getConfluentAvroSchemaPath().isBlank())) {
return handleAvroConfluentWireFormat(kafkaRecords, options);
} else {
throw new IllegalArgumentException(
"Message format " + options.getMessageFormat() + " is unsupported.");
}
}