in v2/jdbc-and-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/JdbcToBigQuery.java [123:293]
static PipelineResult run(JdbcToBigQueryOptions options, Write<TableRow> writeToBQ) {
// Validate BQ STORAGE_WRITE_API options
BigQueryIOUtils.validateBQStorageApiOptionsBatch(options);
if (!options.getUseStorageWriteApi()
&& !options.getUseStorageWriteApiAtLeastOnce()
&& !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
throw new IllegalArgumentException(
"outputDeadletterTable can only be specified if BigQuery Storage Write API is enabled either with useStorageWriteApi or useStorageWriteApiAtLeastOnce.");
}
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
* 2) Append TableRow to BigQuery via BigQueryIO
*/
JdbcIO.DataSourceConfiguration dataSourceConfiguration =
JdbcIO.DataSourceConfiguration.create(
StaticValueProvider.of(options.getDriverClassName()),
maybeDecrypt(
maybeParseSecret(options.getConnectionURL()), options.getKMSEncryptionKey()))
.withUsername(
maybeDecrypt(
maybeParseSecret(options.getUsername()), options.getKMSEncryptionKey()))
.withPassword(
maybeDecrypt(
maybeParseSecret(options.getPassword()), options.getKMSEncryptionKey()));
if (options.getDriverJars() != null) {
dataSourceConfiguration = dataSourceConfiguration.withDriverJars(options.getDriverJars());
}
if (options.getConnectionProperties() != null) {
dataSourceConfiguration =
dataSourceConfiguration.withConnectionProperties(options.getConnectionProperties());
}
/*
* Step 1: Read records via JDBC and convert to TableRow
* via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
*/
PCollection<TableRow> rows;
if (options.getPartitionColumn() != null && options.getTable() != null) {
// Read with Partitions
JdbcIO.ReadWithPartitions<TableRow, ?> readIO = null;
final String partitionColumnType = options.getPartitionColumnType();
if (partitionColumnType == null || "long".equals(partitionColumnType)) {
JdbcIO.ReadWithPartitions<TableRow, Long> longTypeReadIO =
JdbcIO.<TableRow, Long>readWithPartitions(TypeDescriptors.longs())
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable(options.getTable())
.withPartitionColumn(options.getPartitionColumn())
.withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
if (options.getLowerBound() != null && options.getUpperBound() != null) {
// Check if lower bound and upper bound are long type.
try {
longTypeReadIO =
longTypeReadIO
.withLowerBound(Long.valueOf(options.getLowerBound()))
.withUpperBound(Long.valueOf(options.getUpperBound()));
} catch (NumberFormatException e) {
throw new NumberFormatException(
"Expected Long values for lowerBound and upperBound, received : " + e.getMessage());
}
}
readIO = longTypeReadIO;
} else if ("datetime".equals(partitionColumnType)) {
JdbcIO.ReadWithPartitions<TableRow, DateTime> dateTimeReadIO =
JdbcIO.<TableRow, DateTime>readWithPartitions(TypeDescriptor.of(DateTime.class))
.withDataSourceConfiguration(dataSourceConfiguration)
.withTable(options.getTable())
.withPartitionColumn(options.getPartitionColumn())
.withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
if (options.getLowerBound() != null && options.getUpperBound() != null) {
DateTimeFormatter dateFormatter =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSSZ").withOffsetParsed();
// Check if lowerBound and upperBound are DateTime type.
try {
dateTimeReadIO =
dateTimeReadIO
.withLowerBound(dateFormatter.parseDateTime(options.getLowerBound()))
.withUpperBound(dateFormatter.parseDateTime(options.getUpperBound()));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Expected DateTime values in the format for lowerBound and upperBound, received : "
+ e.getMessage());
}
}
readIO = dateTimeReadIO;
} else {
throw new IllegalStateException("Received unsupported partitionColumnType.");
}
if (options.getNumPartitions() != null) {
readIO = readIO.withNumPartitions(options.getNumPartitions());
}
if (options.getFetchSize() != null && options.getFetchSize() > 0) {
readIO = readIO.withFetchSize(options.getFetchSize());
}
rows = pipeline.apply("Read from JDBC with Partitions", readIO);
} else {
if (options.getQuery() == null) {
throw new IllegalArgumentException(
"Either 'query' or both 'table' AND 'PartitionColumn' must be specified to read from JDBC");
}
JdbcIO.Read<TableRow> readIO =
JdbcIO.<TableRow>read()
.withDataSourceConfiguration(dataSourceConfiguration)
.withQuery(new GCSAwareValueProvider(options.getQuery()))
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow(options.getUseColumnAlias()));
if (options.getFetchSize() != null && options.getFetchSize() > 0) {
readIO = readIO.withFetchSize(options.getFetchSize());
}
rows = pipeline.apply("Read from JdbcIO", readIO);
}
/*
* Step 2: Append TableRow to an existing BigQuery table
*/
WriteResult writeResult = rows.apply("Write to BigQuery", writeToBQ);
/*
* Step 3.
* If using Storage Write API, capture failed inserts and either
* a) write error rows to DLQ
* b) fail the pipeline
*/
if (options.getUseStorageWriteApi() || options.getUseStorageWriteApiAtLeastOnce()) {
PCollection<BigQueryInsertError> insertErrors =
BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options);
if (!Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
/*
* Step 3a.
* Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
*/
PCollection<FailsafeElement<String, String>> failedInserts =
insertErrors
.apply(
"WrapInsertionErrors",
MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
.via((BigQueryInsertError e) -> wrapBigQueryInsertError(e)))
.setCoder(FAILSAFE_ELEMENT_CODER);
/*
* Step 3a Contd.
* Insert records that failed insert into deadletter table
*/
failedInserts.apply(
"WriteFailedRecords",
ErrorConverters.WriteStringMessageErrors.newBuilder()
.setErrorRecordsTable(options.getOutputDeadletterTable())
.setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
.setUseWindowedTimestamp(false)
.build());
} else {
/*
* Step 3b.
* Fail pipeline upon write errors if no DLQ was specified
*/
insertErrors.apply(ParDo.of(new ThrowWriteErrorsDoFn()));
}
}
// Execute the pipeline and return the result.
return pipeline.run();
}