in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java [3731:3971]
private <DestinationT> WriteResult continueExpandTyped(
PCollection<KV<DestinationT, T>> input,
Coder<T> elementCoder,
@Nullable Schema elementSchema,
@Nullable SerializableFunction<T, Row> elementToRowFunction,
Coder<DestinationT> destinationCoder,
DynamicDestinations<T, DestinationT> dynamicDestinations,
RowWriterFactory<T, DestinationT> rowWriterFactory,
Write.Method method) {
if (method == Write.Method.STREAMING_INSERTS) {
checkArgument(
getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
"WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection.");
InsertRetryPolicy retryPolicy =
MoreObjects.firstNonNull(getFailedInsertRetryPolicy(), InsertRetryPolicy.alwaysRetry());
checkArgument(
rowWriterFactory.getOutputType() == RowWriterFactory.OutputType.JsonTableRow,
"Avro output is not supported when method == STREAMING_INSERTS");
checkArgument(
getSchemaUpdateOptions() == null || getSchemaUpdateOptions().isEmpty(),
"SchemaUpdateOptions are not supported when method == STREAMING_INSERTS");
checkArgument(
!getPropagateSuccessfulStorageApiWrites(),
"withPropagateSuccessfulStorageApiWrites only supported when using storage api writes.");
checkArgument(
getBadRecordRouter() instanceof ThrowingBadRecordRouter,
"Error Handling is not supported with STREAMING_INSERTS");
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
StreamingInserts<DestinationT, T> streamingInserts =
new StreamingInserts<>(
getCreateDisposition(),
dynamicDestinations,
elementCoder,
tableRowWriterFactory.getToRowFn(),
tableRowWriterFactory.getToFailsafeRowFn())
.withInsertRetryPolicy(retryPolicy)
.withTestServices(getBigQueryServices())
.withExtendedErrorInfo(getExtendedErrorInfo())
.withSkipInvalidRows(getSkipInvalidRows())
.withIgnoreUnknownValues(getIgnoreUnknownValues())
.withIgnoreInsertIds(getIgnoreInsertIds())
.withAutoSharding(getAutoSharding())
.withSuccessfulInsertsPropagation(getPropagateSuccessful())
.withDeterministicRecordIdFn(getDeterministicRecordIdFn())
.withKmsKey(getKmsKey());
return input.apply(streamingInserts);
} else if (method == Write.Method.FILE_LOADS) {
checkArgument(
getFailedInsertRetryPolicy() == null,
"Record-insert retry policies are not supported when using BigQuery load jobs.");
if (getUseAvroLogicalTypes()) {
checkArgument(
rowWriterFactory.getOutputType() == OutputType.AvroGenericRecord,
"useAvroLogicalTypes can only be set with Avro output.");
}
checkArgument(
!getPropagateSuccessfulStorageApiWrites(),
"withPropagateSuccessfulStorageApiWrites only supported when using storage api writes.");
if (!(getBadRecordRouter() instanceof ThrowingBadRecordRouter)) {
LOG.warn(
"Error Handling is partially supported when using FILE_LOADS. Consider using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE");
}
// Batch load handles wrapped json string value differently than the other methods. Raise a
// warning when applies.
if (getJsonSchema() != null && getJsonSchema().isAccessible()) {
JsonElement schema = JsonParser.parseString(getJsonSchema().get());
if (!schema.getAsJsonObject().keySet().isEmpty() && hasJsonTypeInSchema(schema)) {
if (rowWriterFactory.getOutputType() == OutputType.JsonTableRow) {
LOG.warn(
"Found JSON type in TableSchema for 'FILE_LOADS' write method. \n"
+ "Make sure the TableRow value is a Jackson JsonNode to ensure the read as a "
+ "JSON type. Otherwise it will read as a raw (escaped) string.\n"
+ "See https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json#limitations "
+ "for limitations.");
} else if (rowWriterFactory.getOutputType() == OutputType.AvroGenericRecord) {
LOG.warn(
"Found JSON type in TableSchema for 'FILE_LOADS' write method. \n"
+ " check steps in https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro#extract_json_data_from_avro_data "
+ " to ensure the read as a JSON type. Otherwise it will read as a raw "
+ "(escaped) string.");
}
}
}
BatchLoads<DestinationT, T> batchLoads =
new BatchLoads<>(
getWriteDisposition(),
getCreateDisposition(),
getJsonTableRef() != null,
dynamicDestinations,
destinationCoder,
getCustomGcsTempLocation(),
getLoadJobProjectId(),
getIgnoreUnknownValues(),
elementCoder,
rowWriterFactory,
getKmsKey(),
getJsonClustering() != null,
getUseAvroLogicalTypes(),
getWriteTempDataset(),
getBadRecordRouter(),
getBadRecordErrorHandler());
batchLoads.setTestServices(getBigQueryServices());
if (getSchemaUpdateOptions() != null) {
batchLoads.setSchemaUpdateOptions(getSchemaUpdateOptions());
}
if (getMaxFilesPerBundle() != null) {
batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
}
if (getMaxFileSize() != null) {
batchLoads.setMaxFileSize(getMaxFileSize());
}
batchLoads.setMaxFilesPerPartition(getMaxFilesPerPartition());
batchLoads.setMaxBytesPerPartition(getMaxBytesPerPartition());
// When running in streaming (unbounded mode) we want to retry failed load jobs
// indefinitely. Failing the bundle is expensive, so we set a fairly high limit on retries.
if (IsBounded.UNBOUNDED.equals(input.isBounded())) {
batchLoads.setMaxRetryJobs(getMaxRetryJobs());
}
batchLoads.setTriggeringFrequency(getTriggeringFrequency());
if (getAutoSharding()) {
batchLoads.setNumFileShards(0);
} else {
batchLoads.setNumFileShards(getNumFileShards());
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) {
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
if (getUseBeamSchema()) {
// This ensures that the Beam rows are directly translated into protos for Storage API
// writes, with no
// need to round trip through JSON TableRow objects.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsBeamRow<>(
dynamicDestinations,
elementSchema,
elementToRowFunction,
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else if (getWriteProtosClass() != null && getDirectWriteProtos()) {
// We could support both of these by falling back to
// StorageApiDynamicDestinationsTableRow. This
// would defeat the optimization (we would be forced to create a new dynamic proto message
// and copy the data over). For now, we simply give the user a way to disable the
// optimization themselves.
checkArgument(
getRowMutationInformationFn() == null,
"Row upserts and deletes are not for direct proto writes. "
+ "Try setting withDirectWriteProtos(false)");
checkArgument(
!getAutoSchemaUpdate(),
"withAutoSchemaUpdate not supported when using writeProtos."
+ " Try setting withDirectWriteProtos(false)");
checkArgument(
!getIgnoreUnknownValues(),
"ignoreUnknownValues not supported when using writeProtos."
+ " Try setting withDirectWriteProtos(false)");
storageApiDynamicDestinations =
(StorageApiDynamicDestinations<T, DestinationT>)
new StorageApiDynamicDestinationsProto(
dynamicDestinations,
getWriteProtosClass(),
getFormatRecordOnFailureFunction());
} else if (getAvroRowWriterFactory() != null) {
// we can configure the avro to storage write api proto converter for this
// assuming the format function returns an Avro GenericRecord
// and there is a schema defined
checkArgument(
getJsonSchema() != null
|| getDynamicDestinations() != null
|| getSchemaFromView() != null,
"A schema must be provided for avro rows to be used with StorageWrite API.");
RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>
recordWriterFactory =
(RowWriterFactory.AvroRowWriterFactory<T, GenericRecord, DestinationT>)
rowWriterFactory;
SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema> avroSchemaFactory =
Optional.ofNullable(getAvroSchemaFactory()).orElse(DEFAULT_AVRO_SCHEMA_FACTORY);
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsGenericRecord<>(
dynamicDestinations,
avroSchemaFactory,
recordWriterFactory.getToAvroFn(),
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null);
} else {
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
// Fallback behavior: convert to JSON TableRows and convert those into Beam TableRows.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsTableRow<>(
dynamicDestinations,
tableRowWriterFactory.getToRowFn(),
getFormatRecordOnFailureFunction(),
getRowMutationInformationFn() != null,
getCreateDisposition(),
getIgnoreUnknownValues(),
getAutoSchemaUpdate());
}
int numShards = getStorageApiNumStreams(bqOptions);
boolean enableAutoSharding = getAutoSharding();
if (numShards == 0) {
enableAutoSharding = true;
}
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<>(
destinationCoder,
storageApiDynamicDestinations,
getRowMutationInformationFn(),
getCreateDisposition(),
getKmsKey(),
getStorageApiTriggeringFrequency(bqOptions),
getBigQueryServices(),
getStorageApiNumStreams(bqOptions),
method == Method.STORAGE_API_AT_LEAST_ONCE,
enableAutoSharding,
getAutoSchemaUpdate(),
getIgnoreUnknownValues(),
getPropagateSuccessfulStorageApiWrites(),
getPropagateSuccessfulStorageApiWritesPredicate(),
getRowMutationInformationFn() != null,
getDefaultMissingValueInterpretation(),
getBigLakeConfiguration(),
getBadRecordRouter(),
getBadRecordErrorHandler());
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
}
}