in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java [617:927]
public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
String updateCompatibilityBeamVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
// We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
// is not correctly passed in for pipelines that use Beam 2.53.0.
// This is fixed for Beam 2.54.0 and later.
updateCompatibilityBeamVersion =
(updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0";
try {
BigQueryIO.Write.Builder builder = new AutoValue_BigQueryIO_Write.Builder<>();
String jsonTableRef = configRow.getString("json_table_ref");
if (jsonTableRef != null) {
builder = builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
}
byte[] tableFunctionBytes = configRow.getBytes("table_function");
if (tableFunctionBytes != null) {
builder =
builder.setTableFunction(
(SerializableFunction<ValueInSingleWindow, TableDestination>)
fromByteArray(tableFunctionBytes));
}
byte[] formatFunctionBytes = configRow.getBytes("format_function");
if (formatFunctionBytes != null) {
builder =
builder.setFormatFunction(
(SerializableFunction<?, TableRow>) fromByteArray(formatFunctionBytes));
}
byte[] formatRecordOnFailureFunctionBytes =
configRow.getBytes("format_record_on_failure_function");
if (formatRecordOnFailureFunctionBytes != null) {
builder =
builder.setFormatRecordOnFailureFunction(
(SerializableFunction<?, TableRow>)
fromByteArray(formatRecordOnFailureFunctionBytes));
}
byte[] avroRowWriterFactoryBytes = configRow.getBytes("avro_row_writer_factory");
if (avroRowWriterFactoryBytes != null) {
builder =
builder.setAvroRowWriterFactory(
(AvroRowWriterFactory) fromByteArray(avroRowWriterFactoryBytes));
}
byte[] avroSchemaFactoryBytes = configRow.getBytes("avro_schema_factory");
if (avroSchemaFactoryBytes != null) {
builder =
builder.setAvroSchemaFactory(
(SerializableFunction) fromByteArray(avroSchemaFactoryBytes));
}
Boolean useAvroLogicalTypes = configRow.getBoolean("use_avro_logical_types");
if (useAvroLogicalTypes != null) {
builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
}
byte[] dynamicDestinationsBytes = configRow.getBytes("dynamic_destinations");
if (dynamicDestinationsBytes != null) {
builder =
builder.setDynamicDestinations(
(DynamicDestinations) fromByteArray(dynamicDestinationsBytes));
}
String jsonSchema = configRow.getString("json_schema");
if (jsonSchema != null) {
builder = builder.setJsonSchema(StaticValueProvider.of(jsonSchema));
}
String jsonTimePartitioning = configRow.getString("json_time_partitioning");
if (jsonTimePartitioning != null) {
builder = builder.setJsonTimePartitioning(StaticValueProvider.of(jsonTimePartitioning));
}
// Translation with Clustering is broken before 2.56.0, where we used to attempt to
// serialize a non-serializable Clustering object to bytes.
// In 2.56.0 onwards, we translate using the json string representation instead.
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.56.0") >= 0) {
String jsonClustering = configRow.getString("clustering");
if (jsonClustering != null) {
builder = builder.setJsonClustering(StaticValueProvider.of(jsonClustering));
}
}
byte[] createDispositionBytes = configRow.getBytes("create_disposition");
if (createDispositionBytes != null) {
builder =
builder.setCreateDisposition(
(CreateDisposition) fromByteArray(createDispositionBytes));
}
byte[] writeDispositionBytes = configRow.getBytes("write_disposition");
if (writeDispositionBytes != null) {
builder =
builder.setWriteDisposition((WriteDisposition) fromByteArray(writeDispositionBytes));
}
Collection<byte[]> schemaUpdateOptionsData = configRow.getArray("schema_update_options");
if (schemaUpdateOptionsData != null) {
Set<SchemaUpdateOption> schemaUpdateOptions =
schemaUpdateOptionsData.stream()
.map(
data -> {
try {
return (SchemaUpdateOption) fromByteArray(data);
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
})
.collect(Collectors.toSet());
builder = builder.setSchemaUpdateOptions(schemaUpdateOptions);
} else {
// This property is not nullable.
builder = builder.setSchemaUpdateOptions(Collections.emptySet());
}
String tableDescription = configRow.getString("table_description");
if (tableDescription != null) {
builder = builder.setTableDescription(tableDescription);
}
Map<String, String> biglakeConfiguration = configRow.getMap("biglake_configuration");
if (biglakeConfiguration != null) {
builder = builder.setBigLakeConfiguration(biglakeConfiguration);
}
Boolean validate = configRow.getBoolean("validate");
if (validate != null) {
builder = builder.setValidate(validate);
}
byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
if (bigqueryServicesBytes != null) {
try {
builder =
builder.setBigQueryServices(
(BigQueryServices) fromByteArray(bigqueryServicesBytes));
} catch (InvalidClassException e) {
LOG.warn(
"Could not use the provided `BigQueryServices` implementation when upgrading."
+ "Using the default.");
builder.setBigQueryServices(new BigQueryServicesImpl());
}
}
Integer maxFilesPerBundle = configRow.getInt32("max_files_per_bundle");
if (maxFilesPerBundle != null) {
builder = builder.setMaxFilesPerBundle(maxFilesPerBundle);
}
Long maxFileSize = configRow.getInt64("max_file_size");
if (maxFileSize != null) {
builder = builder.setMaxFileSize(maxFileSize);
}
Integer numFileShards = configRow.getInt32("num_file_shards");
if (numFileShards != null) {
builder = builder.setNumFileShards(numFileShards);
}
Integer numStorageWriteApiStreams = configRow.getInt32("num_storage_write_api_streams");
if (numStorageWriteApiStreams != null) {
builder = builder.setNumStorageWriteApiStreams(numStorageWriteApiStreams);
}
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.60.0") >= 0) {
Boolean propagateSuccessfulStorageApiWrites =
configRow.getBoolean("propagate_successful_storage_api_writes");
if (propagateSuccessfulStorageApiWrites != null) {
builder =
builder.setPropagateSuccessfulStorageApiWrites(propagateSuccessfulStorageApiWrites);
}
byte[] predicate =
configRow.getBytes("propagate_successful_storage_api_writes_predicate");
if (predicate != null) {
builder =
builder.setPropagateSuccessfulStorageApiWritesPredicate(
(Predicate<String>) fromByteArray(predicate));
}
} else {
builder.setPropagateSuccessfulStorageApiWrites(false);
builder.setPropagateSuccessfulStorageApiWritesPredicate(Predicates.alwaysTrue());
}
Integer maxFilesPerPartition = configRow.getInt32("max_files_per_partition");
if (maxFilesPerPartition != null) {
builder = builder.setMaxFilesPerPartition(maxFilesPerPartition);
}
Long maxBytesPerPartition = configRow.getInt64("max_bytes_per_partition");
if (maxBytesPerPartition != null) {
builder = builder.setMaxBytesPerPartition(maxBytesPerPartition);
}
// We need to update the 'triggerring_frequency' field name for pipelines that are upgraded
// from Beam 2.53.0 due to https://github.com/apache/beam/pull/29785.
// This is fixed for Beam 2.54.0 and later.
String triggeringFrequencyFieldName =
TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.53.0") == 0
? "triggerring_frequency"
: "triggering_frequency";
Duration triggeringFrequency = configRow.getValue(triggeringFrequencyFieldName);
if (triggeringFrequency != null) {
builder =
builder.setTriggeringFrequency(
org.joda.time.Duration.millis(triggeringFrequency.toMillis()));
}
byte[] methodBytes = configRow.getBytes("method");
if (methodBytes != null) {
builder = builder.setMethod((Write.Method) fromByteArray(methodBytes));
}
String loadJobProjectId = configRow.getString("load_job_project_id");
if (loadJobProjectId != null) {
builder = builder.setLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));
}
byte[] failedInsertRetryPolicyBytes = configRow.getBytes("failed_insert_retry_policy");
if (failedInsertRetryPolicyBytes != null) {
builder =
builder.setFailedInsertRetryPolicy(
(InsertRetryPolicy) fromByteArray(failedInsertRetryPolicyBytes));
}
String customGcsTempLocations = configRow.getString("custom_gcs_temp_location");
if (customGcsTempLocations != null) {
builder =
builder.setCustomGcsTempLocation(StaticValueProvider.of(customGcsTempLocations));
}
Boolean extendedErrorInfo = configRow.getBoolean("extended_error_info");
if (extendedErrorInfo != null) {
builder = builder.setExtendedErrorInfo(extendedErrorInfo);
}
Boolean skipInvalidRows = configRow.getBoolean("skip_invalid_rows");
if (skipInvalidRows != null) {
builder = builder.setSkipInvalidRows(skipInvalidRows);
}
Boolean ignoreUnknownValues = configRow.getBoolean("ignore_unknown_values");
if (ignoreUnknownValues != null) {
builder = builder.setIgnoreUnknownValues(ignoreUnknownValues);
}
Boolean ignoreInsertIds = configRow.getBoolean("ignore_insert_ids");
if (ignoreInsertIds != null) {
builder = builder.setIgnoreInsertIds(ignoreInsertIds);
}
Integer maxRetryJobs = configRow.getInt32("max_retry_jobs");
if (maxRetryJobs != null) {
builder = builder.setMaxRetryJobs(maxRetryJobs);
}
String kmsKey = configRow.getString("kms_key");
if (kmsKey != null) {
builder = builder.setKmsKey(kmsKey);
}
Collection<String> primaryKey = configRow.getArray("primary_key");
if (primaryKey != null && !primaryKey.isEmpty()) {
builder = builder.setPrimaryKey(ImmutableList.of(primaryKey));
}
byte[] defaultMissingValueInterpretationsBytes =
configRow.getBytes("default_missing_value_interpretation");
if (defaultMissingValueInterpretationsBytes != null) {
builder =
builder.setDefaultMissingValueInterpretation(
(MissingValueInterpretation)
fromByteArray(defaultMissingValueInterpretationsBytes));
}
Boolean optimizeWrites = configRow.getBoolean("optimize_writes");
if (optimizeWrites != null) {
builder = builder.setOptimizeWrites(optimizeWrites);
}
Boolean useBeamSchema = configRow.getBoolean("use_beam_schema");
if (useBeamSchema != null) {
builder = builder.setUseBeamSchema(useBeamSchema);
}
Boolean autoSharding = configRow.getBoolean("auto_sharding");
if (autoSharding != null) {
builder = builder.setAutoSharding(autoSharding);
}
Boolean propagateSuccessful = configRow.getBoolean("propagate_successful");
if (propagateSuccessful != null) {
builder = builder.setPropagateSuccessful(propagateSuccessful);
}
Boolean autoSchemaUpdate = configRow.getBoolean("auto_schema_update");
if (autoSchemaUpdate != null) {
builder = builder.setAutoSchemaUpdate(autoSchemaUpdate);
}
byte[] writeProtosClasses = configRow.getBytes("write_protos_class");
if (writeProtosClasses != null) {
builder =
builder.setWriteProtosClass(
(Class) fromByteArray(defaultMissingValueInterpretationsBytes));
}
Boolean directWriteProtos = configRow.getBoolean("direct_write_protos");
if (directWriteProtos != null) {
builder = builder.setDirectWriteProtos(directWriteProtos);
}
byte[] deterministicRecordIdFnBytes = configRow.getBytes("deterministic_record_id_fn");
if (deterministicRecordIdFnBytes != null) {
builder =
builder.setDeterministicRecordIdFn(
(SerializableFunction) fromByteArray(deterministicRecordIdFnBytes));
}
String writeTempDataset = configRow.getString("write_temp_dataset");
if (writeTempDataset != null) {
builder = builder.setWriteTempDataset(writeTempDataset);
}
byte[] rowMutationInformationFnBytes = configRow.getBytes("row_mutation_information_fn");
if (rowMutationInformationFnBytes != null) {
builder =
builder.setRowMutationInformationFn(
(SerializableFunction) fromByteArray(rowMutationInformationFnBytes));
}
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.55.0") < 0) {
// We need to use defaults here for BQ rear/write transforms upgraded
// from older Beam versions.
// See https://github.com/apache/beam/issues/30534.
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>());
} else {
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler");
builder.setBadRecordErrorHandler(
(ErrorHandler<BadRecord, ?>) fromByteArray(badRecordErrorHandler));
}
return builder.build();
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
}