in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java [453:614]
public Row toConfigRow(Write<?> transform) {
Map<String, Object> fieldValues = new HashMap<>();
if (transform.getJsonTableRef() != null) {
fieldValues.put("json_table_ref", transform.getJsonTableRef().get());
}
if (transform.getTableFunction() != null) {
fieldValues.put("table_function", toByteArray(transform.getTableFunction()));
}
if (transform.getFormatFunction() != null) {
fieldValues.put("format_function", toByteArray(transform.getFormatFunction()));
}
if (transform.getFormatRecordOnFailureFunction() != null) {
fieldValues.put(
"format_record_on_failure_function",
toByteArray(transform.getFormatRecordOnFailureFunction()));
}
if (transform.getAvroRowWriterFactory() != null) {
fieldValues.put(
"avro_row_writer_factory", toByteArray(transform.getAvroRowWriterFactory()));
}
fieldValues.put("use_avro_logical_types", transform.getUseAvroLogicalTypes());
if (transform.getDynamicDestinations() != null) {
fieldValues.put("dynamic_destinations", toByteArray(transform.getDynamicDestinations()));
}
if (transform.getSchemaFromView() != null) {
// Property 'getSchemaFromView' cannot be used in a portable way across pipelines since it
// is bound to PCollections generated for the current pipeline instance.
throw new IllegalArgumentException(
"BigQueryIO.Write transforms cannot be converted to a "
+ "portable row based config due to 'withSchemaFromView' property being set. Please "
+ "retry without setting this property when configuring your transform");
}
if (transform.getJsonSchema() != null) {
fieldValues.put("json_schema", transform.getJsonSchema().get());
}
if (transform.getJsonTimePartitioning() != null) {
fieldValues.put(
"json_time_partitioning", toByteArray(transform.getJsonTimePartitioning().get()));
}
if (transform.getJsonClustering() != null) {
fieldValues.put("clustering", transform.getJsonClustering().get());
}
if (transform.getCreateDisposition() != null) {
fieldValues.put("create_disposition", toByteArray(transform.getCreateDisposition()));
}
if (transform.getWriteDisposition() != null) {
fieldValues.put("write_disposition", toByteArray(transform.getWriteDisposition()));
}
if (transform.getSchemaUpdateOptions() != null
&& !transform.getSchemaUpdateOptions().isEmpty()) {
List<byte[]> schemUpdateOptionsData =
transform.getSchemaUpdateOptions().stream()
.map(option -> toByteArray(option))
.collect(Collectors.toList());
fieldValues.put("schema_update_options", schemUpdateOptionsData);
}
if (transform.getTableDescription() != null) {
fieldValues.put("table_description", transform.getTableDescription());
}
if (transform.getBigLakeConfiguration() != null) {
fieldValues.put("biglake_configuration", transform.getBigLakeConfiguration());
}
fieldValues.put("validate", transform.getValidate());
if (transform.getBigQueryServices() != null) {
fieldValues.put("bigquery_services", toByteArray(transform.getBigQueryServices()));
}
if (transform.getMaxFilesPerBundle() != null) {
fieldValues.put("max_files_per_bundle", transform.getMaxFilesPerBundle());
}
if (transform.getMaxFileSize() != null) {
fieldValues.put("max_file_size", transform.getMaxFileSize());
}
fieldValues.put("num_file_shards", transform.getNumFileShards());
fieldValues.put("num_storage_write_api_streams", transform.getNumStorageWriteApiStreams());
fieldValues.put(
"propagate_successful_storage_api_writes",
transform.getPropagateSuccessfulStorageApiWrites());
fieldValues.put(
"propagate_successful_storage_api_writes_predicate",
toByteArray(transform.getPropagateSuccessfulStorageApiWritesPredicate()));
fieldValues.put("max_files_per_partition", transform.getMaxFilesPerPartition());
fieldValues.put("max_bytes_per_partition", transform.getMaxBytesPerPartition());
if (transform.getTriggeringFrequency() != null) {
fieldValues.put(
"triggering_frequency",
Duration.ofMillis(transform.getTriggeringFrequency().getMillis()));
}
if (transform.getMethod() != null) {
fieldValues.put("method", toByteArray(transform.getMethod()));
}
if (transform.getLoadJobProjectId() != null) {
fieldValues.put("load_job_project_id", transform.getLoadJobProjectId());
}
if (transform.getFailedInsertRetryPolicy() != null) {
fieldValues.put(
"failed_insert_retry_policy", toByteArray(transform.getFailedInsertRetryPolicy()));
}
if (transform.getCustomGcsTempLocation() != null) {
fieldValues.put("custom_gcs_temp_location", transform.getCustomGcsTempLocation().get());
}
fieldValues.put("extended_error_info", transform.getExtendedErrorInfo());
if (transform.getSkipInvalidRows() != null) {
fieldValues.put("skip_invalid_rows", transform.getSkipInvalidRows());
}
if (transform.getIgnoreUnknownValues() != null) {
fieldValues.put("ignore_unknown_values", transform.getIgnoreUnknownValues());
}
if (transform.getIgnoreInsertIds() != null) {
fieldValues.put("ignore_insert_ids", transform.getIgnoreInsertIds());
}
fieldValues.put("max_retry_jobs", transform.getMaxRetryJobs());
if (transform.getPropagateSuccessful() != null) {
fieldValues.put("propagate_successful", transform.getPropagateSuccessful());
}
if (transform.getKmsKey() != null) {
fieldValues.put("kms_key", transform.getKmsKey());
}
if (transform.getPrimaryKey() != null) {
fieldValues.put("primary_key", transform.getPrimaryKey());
}
if (transform.getDefaultMissingValueInterpretation() != null) {
fieldValues.put(
"default_missing_value_interpretation",
toByteArray(transform.getDefaultMissingValueInterpretation()));
}
if (transform.getOptimizeWrites() != null) {
fieldValues.put("optimize_writes", transform.getOptimizeWrites());
}
if (transform.getUseBeamSchema() != null) {
fieldValues.put("use_beam_schema", transform.getUseBeamSchema());
}
if (transform.getAutoSharding() != null) {
fieldValues.put("auto_sharding", transform.getAutoSharding());
}
if (transform.getAutoSchemaUpdate() != null) {
fieldValues.put("auto_schema_update", transform.getAutoSchemaUpdate());
}
if (transform.getWriteProtosClass() != null) {
fieldValues.put("write_protos_class", toByteArray(transform.getWriteProtosClass()));
}
if (transform.getDirectWriteProtos() != null) {
fieldValues.put("direct_write_protos", transform.getDirectWriteProtos());
}
if (transform.getDeterministicRecordIdFn() != null) {
fieldValues.put(
"deterministic_record_id_fn", toByteArray(transform.getDeterministicRecordIdFn()));
}
if (transform.getWriteTempDataset() != null) {
fieldValues.put("write_temp_dataset", toByteArray(transform.getWriteTempDataset()));
}
if (transform.getRowMutationInformationFn() != null) {
fieldValues.put(
"row_mutation_information_fn", toByteArray(transform.getRowMutationInformationFn()));
}
fieldValues.put("bad_record_router", toByteArray(transform.getBadRecordRouter()));
fieldValues.put(
"bad_record_error_handler", toByteArray(transform.getBadRecordErrorHandler()));
return Row.withSchema(schema).withFieldValues(fieldValues).build();
}