in v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/TextToBigQueryStreaming.java [299:444]
public static PipelineResult run(TextToBigQueryStreamingOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Register the coder for pipeline
FailsafeElementCoder<String, String> coder =
FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
CoderRegistry coderRegistry = pipeline.getCoderRegistry();
coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);
// Determine if we are using Python UDFs or JS UDFs based on the provided options.
boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
if (useJavascriptUdf && usePythonUdf) {
throw new IllegalArgumentException(
"Either javascript or Python gcs path must be provided, but not both.");
}
/*
* Steps:
* 1) Read from the text source continuously.
* 2) Convert to FailsafeElement.
* 3) Apply Javascript udf transformation.
* - Tag records that were successfully transformed and those
* that failed transformation.
* 4) Convert records to TableRow.
* - Tag records that were successfully converted and those
* that failed conversion.
* 5) Insert successfully converted records into BigQuery.
* - Errors encountered while streaming will be sent to deadletter table.
* 6) Insert records that failed into deadletter table.
*/
PCollection<String> sourceRead =
pipeline.apply(
TextIO.read()
.from(options.getInputFilePattern())
.watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()));
PCollectionTuple transformedOutput;
if (usePythonUdf) {
transformedOutput =
sourceRead
.apply(
"MapToRecord",
PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
.stringMappingFunction())
.setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
.apply(
"InvokeUDF",
PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
.setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
.setFunctionName(options.getPythonExternalTextTransformFunctionName())
.build())
.apply(
ParDo.of(new RowToStringFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))
.withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));
} else {
transformedOutput =
pipeline
// 1) Read from the text source continuously.
.apply(
"ReadFromSource",
TextIO.read()
.from(options.getInputFilePattern())
.watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()))
// 2) Convert to FailsafeElement.
.apply(
"ConvertToFailsafeElement",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(input -> FailsafeElement.of(input, input)))
// 3) Apply Javascript udf transformation.
.apply(
"ApplyUDFTransformation",
FailsafeJavascriptUdf.<String>newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.setReloadIntervalMinutes(
options.getJavascriptTextTransformReloadIntervalMinutes())
.setSuccessTag(UDF_OUT)
.setFailureTag(UDF_DEADLETTER_OUT)
.build());
}
PCollectionTuple convertedTableRows =
transformedOutput
// 4) Convert records to TableRow.
.get(UDF_OUT)
.apply(
"ConvertJSONToTableRow",
FailsafeJsonToTableRow.<String>newBuilder()
.setSuccessTag(TRANSFORM_OUT)
.setFailureTag(TRANSFORM_DEADLETTER_OUT)
.build());
WriteResult writeResult =
convertedTableRows
// 5) Insert successfully converted records into BigQuery.
.get(TRANSFORM_OUT)
.apply(
"InsertIntoBigQuery",
BigQueryIO.writeTableRows()
.withJsonSchema(GCSUtils.getGcsFileAsString(options.getJSONPath()))
.to(options.getOutputTable())
.withExtendedErrorInfo()
.withoutValidation()
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
.withCustomGcsTempLocation(
StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory())));
// Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
PCollection<FailsafeElement<String, String>> failedInserts =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via(TextToBigQueryStreaming::wrapBigQueryInsertError));
// 6) Insert records that failed transformation or conversion into deadletter table
PCollectionList.of(
ImmutableList.of(
transformedOutput.get(UDF_DEADLETTER_OUT),
convertedTableRows.get(TRANSFORM_DEADLETTER_OUT),
failedInserts))
.apply("Flatten", Flatten.pCollections())
.apply(
"WriteFailedRecords",
WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(
StringUtils.isNotEmpty(options.getOutputDeadletterTable())
? options.getOutputDeadletterTable()
: options.getOutputTable() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.build());
return pipeline.run();
}