in v2/kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java [465:603]
public static PipelineResult runAvroPipeline(
Pipeline pipeline,
KafkaToBQOptions options,
List<String> topicsList,
String bootstrapServers,
Map<String, Object> kafkaConfig) {
String avroSchema = options.getAvroSchemaPath();
Schema schema = SchemaUtils.getAvroSchema(avroSchema);
PCollectionTuple convertedTableRows;
PCollection<KV<byte[], GenericRecord>> genericRecords;
genericRecords =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
KafkaTransform.readAvroFromKafka(
bootstrapServers, topicsList, kafkaConfig, avroSchema, false))
.setCoder(
KvCoder.of(
ByteArrayCoder.of(),
SchemaCoder.of(
AvroUtils.toBeamSchema(schema),
TypeDescriptor.of(GenericRecord.class),
AvroUtils.getToRowFunction(GenericRecord.class, schema),
AvroUtils.getFromRowFunction(GenericRecord.class))));
/*
* Step #2: Transform the Kafka Messages into TableRows
*/
WriteResult writeResult;
// if it has UDF, convert from GenericAvro to JSON and apply UDF
if (StringUtils.isNotEmpty(options.getJavascriptTextTransformGcsPath())
&& StringUtils.isNotEmpty(options.getJavascriptTextTransformFunctionName())) {
convertedTableRows =
new StringMessageToTableRow(options)
.expand(
genericRecords.apply(
"ConvertToJson",
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptors.strings()))
.via(
kv ->
KV.of(
new String(kv.getKey(), StandardCharsets.UTF_8),
Objects.requireNonNull(kv.getValue()).toString()))));
writeResult =
convertedTableRows
.get(TRANSFORM_OUT)
.apply(
"WriteSuccessfulRecords",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withExtendedErrorInfo()
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.to(options.getOutputTableSpec()));
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(KafkaToBigQuery::wrapBigQueryInsertError))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #4: Write failed records out to BigQuery
*/
PCollectionList.of(convertedTableRows.get(UDF_DEADLETTER_OUT))
.and(convertedTableRows.get(TRANSFORM_DEADLETTER_OUT))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteTransformationFailedRecords",
WriteStringKVMessageErrors.newBuilder()
.setErrorRecordsTable(
ObjectUtils.firstNonNull(
options.getOutputDeadletterTable(),
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());
/*
* Step #5: Insert records that failed BigQuery inserts into a deadletter table.
*/
failedInserts.apply(
"WriteInsertionFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ObjectUtils.firstNonNull(
options.getOutputDeadletterTable(),
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());
} else {
writeResult =
genericRecords
.apply(Values.create())
.apply(Convert.toRows())
.apply(BigQueryConverters.<Row>createWriteTransform(options).useBeamSchema());
/*
* Step 3 Contd.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(KafkaToBigQuery::wrapBigQueryInsertError))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step #5: Insert records that failed BigQuery inserts into a deadletter table.
*/
failedInserts.apply(
"WriteInsertionFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
ObjectUtils.firstNonNull(
options.getOutputDeadletterTable(),
options.getOutputTableSpec() + DEFAULT_DEADLETTER_TABLE_SUFFIX))
.setErrorRecordsTableSchema(SchemaUtils.DEADLETTER_SCHEMA)
.build());
}
return pipeline.run();
}