in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTranslation.java [208:359]
public TypedRead<?> fromConfigRow(Row configRow, PipelineOptions options) {
String updateCompatibilityBeamVersion =
options.as(StreamingOptions.class).getUpdateCompatibilityVersion();
// We need to set a default 'updateCompatibilityBeamVersion' here since this PipelineOption
// is not correctly passed in for pipelines that use Beam 2.53.0.
// This is fixed for Beam 2.54.0 and later.
updateCompatibilityBeamVersion =
(updateCompatibilityBeamVersion != null) ? updateCompatibilityBeamVersion : "2.53.0";
try {
BigQueryIO.TypedRead.Builder builder = new AutoValue_BigQueryIO_TypedRead.Builder<>();
String jsonTableRef = configRow.getString("json_table_ref");
if (jsonTableRef != null) {
builder = builder.setJsonTableRef(StaticValueProvider.of(jsonTableRef));
}
String query = configRow.getString("query");
if (query != null) {
builder = builder.setQuery(StaticValueProvider.of(query));
}
Boolean validate = configRow.getBoolean("validate");
if (validate != null) {
builder = builder.setValidate(validate);
}
Boolean flattenResults = configRow.getBoolean("flatten_results");
if (flattenResults != null) {
builder = builder.setFlattenResults(flattenResults);
}
Boolean useLegacySQL = configRow.getBoolean("use_legacy_sql");
if (useLegacySQL != null) {
builder = builder.setUseLegacySql(useLegacySQL);
}
Boolean withTemplateCompatibility = configRow.getBoolean("with_template_compatibility");
if (withTemplateCompatibility != null) {
builder = builder.setWithTemplateCompatibility(withTemplateCompatibility);
}
byte[] bigqueryServicesBytes = configRow.getBytes("bigquery_services");
if (bigqueryServicesBytes != null) {
try {
builder =
builder.setBigQueryServices(
(BigQueryServices) fromByteArray(bigqueryServicesBytes));
} catch (InvalidClassException e) {
LOG.warn(
"Could not use the provided `BigQueryServices` implementation when upgrading."
+ "Using the default.");
builder.setBigQueryServices(new BigQueryServicesImpl());
}
}
byte[] parseFnBytes = configRow.getBytes("parse_fn");
if (parseFnBytes != null) {
builder = builder.setParseFn((SerializableFunction) fromByteArray(parseFnBytes));
}
byte[] datumReaderFactoryBytes = configRow.getBytes("datum_reader_factory");
if (datumReaderFactoryBytes != null) {
builder =
builder.setDatumReaderFactory(
(SerializableFunction) fromByteArray(datumReaderFactoryBytes));
}
byte[] queryPriorityBytes = configRow.getBytes("query_priority");
if (queryPriorityBytes != null) {
builder = builder.setQueryPriority((QueryPriority) fromByteArray(queryPriorityBytes));
}
String queryLocation = configRow.getString("query_location");
if (queryLocation != null) {
builder = builder.setQueryLocation(queryLocation);
}
String queryTempDataset = configRow.getString("query_temp_dataset");
if (queryTempDataset != null) {
builder = builder.setQueryTempDataset(queryTempDataset);
}
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.57.0") >= 0) {
// This property was added for Beam 2.57.0 hence not available when
// upgrading the transform from previous Beam versions.
String queryTempProject = configRow.getString("query_temp_project");
if (queryTempProject != null) {
builder = builder.setQueryTempProject(queryTempProject);
}
}
byte[] methodBytes = configRow.getBytes("method");
if (methodBytes != null) {
builder = builder.setMethod((TypedRead.Method) fromByteArray(methodBytes));
}
byte[] formatBytes = configRow.getBytes("format");
if (formatBytes != null) {
builder = builder.setFormat((DataFormat) fromByteArray(formatBytes));
}
Collection<String> selectedFields = configRow.getArray("selected_fields");
if (selectedFields != null && !selectedFields.isEmpty()) {
builder.setSelectedFields(StaticValueProvider.of(ImmutableList.of(selectedFields)));
}
String rowRestriction = configRow.getString("row_restriction");
if (rowRestriction != null) {
builder = builder.setRowRestriction(StaticValueProvider.of(rowRestriction));
}
byte[] coderBytes = configRow.getBytes("coder");
if (coderBytes != null) {
try {
builder = builder.setCoder((Coder) fromByteArray(coderBytes));
} catch (InvalidClassException e) {
LOG.warn(
"Could not use the provided `Coder` implementation when upgrading."
+ "Using the default.");
}
}
String kmsKey = configRow.getString("kms_key");
if (kmsKey != null) {
builder = builder.setKmsKey(kmsKey);
}
byte[] typeDescriptorBytes = configRow.getBytes("type_descriptor");
if (typeDescriptorBytes != null) {
builder = builder.setTypeDescriptor((TypeDescriptor) fromByteArray(typeDescriptorBytes));
}
byte[] toBeamRowFnBytes = configRow.getBytes("to_beam_row_fn");
if (toBeamRowFnBytes != null) {
builder = builder.setToBeamRowFn((ToBeamRowFunction) fromByteArray(toBeamRowFnBytes));
}
byte[] fromBeamRowFnBytes = configRow.getBytes("from_beam_row_fn");
if (fromBeamRowFnBytes != null) {
builder =
builder.setFromBeamRowFn((FromBeamRowFunction) fromByteArray(fromBeamRowFnBytes));
}
Boolean useAvroLogicalTypes = configRow.getBoolean("use_avro_logical_types");
if (useAvroLogicalTypes != null) {
builder = builder.setUseAvroLogicalTypes(useAvroLogicalTypes);
}
Boolean projectionPushdownApplied = configRow.getBoolean("projection_pushdown_applied");
if (projectionPushdownApplied != null) {
builder = builder.setProjectionPushdownApplied(projectionPushdownApplied);
}
if (TransformUpgrader.compareVersions(updateCompatibilityBeamVersion, "2.55.0") < 0) {
// We need to use defaults here for BQ rear/write transforms upgraded
// from older Beam versions.
// See https://github.com/apache/beam/issues/30534.
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new BadRecordErrorHandler.DefaultErrorHandler<>());
} else {
byte[] badRecordRouter = configRow.getBytes("bad_record_router");
builder.setBadRecordRouter((BadRecordRouter) fromByteArray(badRecordRouter));
byte[] badRecordErrorHandler = configRow.getBytes("bad_record_error_handler");
builder.setBadRecordErrorHandler(
(ErrorHandler<BadRecord, ?>) fromByteArray(badRecordErrorHandler));
}
return builder.build();
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
}