in sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java [3397:3592]
public WriteResult expand(PCollection<T> input) {
// We must have a destination to write to!
checkArgument(
getTableFunction() != null
|| getJsonTableRef() != null
|| getDynamicDestinations() != null,
"must set the table reference of a BigQueryIO.Write transform");
List<?> allToArgs =
Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations());
checkArgument(
1
== Iterables.size(
allToArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"Exactly one of jsonTableRef, tableFunction, or dynamicDestinations must be set");
List<?> allSchemaArgs =
Lists.newArrayList(getJsonSchema(), getSchemaFromView(), getDynamicDestinations());
checkArgument(
2
> Iterables.size(
allSchemaArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"No more than one of jsonSchema, schemaFromView, or dynamicDestinations may be set");
// Perform some argument checks
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
Write.Method method = resolveMethod(input);
if (input.isBounded() == IsBounded.UNBOUNDED) {
if (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API) {
Duration triggeringFrequency =
(method == Write.Method.STORAGE_WRITE_API)
? getStorageApiTriggeringFrequency(bqOptions)
: getTriggeringFrequency();
checkArgument(
triggeringFrequency != null,
"When writing an unbounded PCollection via FILE_LOADS or STORAGE_WRITE_API, "
+ "triggering frequency must be specified");
} else {
checkArgument(
getTriggeringFrequency() == null,
"Triggering frequency can be specified only when writing via FILE_LOADS or STORAGE_WRITE_API, but the method was %s.",
method);
}
if (method != Method.FILE_LOADS) {
checkArgument(
getNumFileShards() == 0,
"Number of file shards can be specified only when writing via FILE_LOADS, but the method was %s.",
method);
}
if (method == Method.STORAGE_API_AT_LEAST_ONCE
&& getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.warn(
"Storage API triggering frequency option will be ignored is it can only be specified only "
+ "when writing via STORAGE_WRITE_API, but the method was {}.",
method);
}
if (getAutoSharding()) {
if (method == Method.STORAGE_WRITE_API && getStorageApiNumStreams(bqOptions) > 0) {
LOG.warn(
"Both numStorageWriteApiStreams and auto-sharding options are set. Will default to auto-sharding."
+ " To set a fixed number of streams, do not enable auto-sharding.");
} else if (method == Method.FILE_LOADS && getNumFileShards() > 0) {
LOG.warn(
"Both numFileShards and auto-sharding options are set. Will default to auto-sharding."
+ " To set a fixed number of file shards, do not enable auto-sharding.");
} else if (method == Method.STORAGE_API_AT_LEAST_ONCE) {
LOG.warn(
"The setting of auto-sharding is ignored. It is only supported when writing an"
+ " unbounded PCollection via FILE_LOADS, STREAMING_INSERTS or"
+ " STORAGE_WRITE_API, but the method was {}.",
method);
}
}
} else { // PCollection is bounded
String error =
String.format(
" is only applicable to an unbounded PCollection, but the input PCollection is %s.",
input.isBounded());
checkArgument(getTriggeringFrequency() == null, "Triggering frequency" + error);
checkArgument(!getAutoSharding(), "Auto-sharding" + error);
checkArgument(getNumFileShards() == 0, "Number of file shards" + error);
if (getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.warn("Setting a triggering frequency" + error);
}
if (getStorageApiNumStreams(bqOptions) != 0) {
LOG.warn("Setting the number of Storage API streams" + error);
}
}
if (method == Method.STORAGE_API_AT_LEAST_ONCE && getStorageApiNumStreams(bqOptions) != 0) {
LOG.warn(
"Setting a number of Storage API streams is only supported when using STORAGE_WRITE_API");
}
if (method != Method.STORAGE_WRITE_API && method != Method.STORAGE_API_AT_LEAST_ONCE) {
checkArgument(
!getAutoSchemaUpdate(),
"withAutoSchemaUpdate only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
checkArgument(
getBigLakeConfiguration() == null,
"bigLakeConfiguration is only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
} else {
if (getWriteDisposition() == WriteDisposition.WRITE_TRUNCATE) {
LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
}
if (getBigLakeConfiguration() != null) {
checkArgument(
Arrays.stream(new String[] {CONNECTION_ID, STORAGE_URI})
.allMatch(getBigLakeConfiguration()::containsKey),
String.format(
"bigLakeConfiguration must contain keys '%s' and '%s'",
CONNECTION_ID, STORAGE_URI));
}
}
if (getRowMutationInformationFn() != null) {
checkArgument(
getMethod() == Method.STORAGE_API_AT_LEAST_ONCE,
"When using row updates on BigQuery, StorageWrite API should execute using"
+ " \"at least once\" mode.");
checkArgument(
getCreateDisposition() == CreateDisposition.CREATE_NEVER || getPrimaryKey() != null,
"If specifying CREATE_IF_NEEDED along with row updates, a primary key needs to be specified");
}
if (getPrimaryKey() != null) {
checkArgument(
getMethod() != Method.FILE_LOADS, "Primary key not supported when using FILE_LOADS");
}
if (getAutoSchemaUpdate()) {
// TODO(reuvenlax): Remove this restriction once we implement support.
checkArgument(
getIgnoreUnknownValues(),
"Auto schema update currently only supported when ignoreUnknownValues also set.");
checkArgument(
!getUseBeamSchema(), "Auto schema update not supported when using Beam schemas.");
}
if (getJsonTimePartitioning() != null) {
checkArgument(
getDynamicDestinations() == null,
"The supplied DynamicDestinations object can directly set TimePartitioning."
+ " There is no need to call BigQueryIO.Write.withTimePartitioning.");
checkArgument(
getTableFunction() == null,
"The supplied getTableFunction object can directly set TimePartitioning."
+ " There is no need to call BigQueryIO.Write.withTimePartitioning.");
}
DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
if (dynamicDestinations == null) {
if (getJsonTableRef() != null) {
dynamicDestinations =
DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(
getJsonTableRef(), getTableDescription(), getJsonClustering() != null);
} else if (getTableFunction() != null) {
dynamicDestinations =
new TableFunctionDestinations<>(getTableFunction(), getJsonClustering() != null);
}
// Wrap with a DynamicDestinations class that will provide a schema. There might be no
// schema provided if the create disposition is CREATE_NEVER.
if (getJsonSchema() != null) {
dynamicDestinations =
new ConstantSchemaDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations, getJsonSchema());
} else if (getSchemaFromView() != null) {
dynamicDestinations =
new SchemaFromViewDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
getSchemaFromView());
}
// Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
if (getJsonTimePartitioning() != null || (getJsonClustering() != null)) {
dynamicDestinations =
new ConstantTimePartitioningClusteringDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
getJsonTimePartitioning(),
getJsonClustering());
}
if (getPrimaryKey() != null) {
dynamicDestinations =
new DynamicDestinationsHelpers.ConstantTableConstraintsDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
new TableConstraints()
.setPrimaryKey(
new TableConstraints.PrimaryKey().setColumns(getPrimaryKey())));
}
}
return expandTyped(input, dynamicDestinations);
}