in sdks/python/apache_beam/io/gcp/bigquery.py [0:0]
def __init__(
self,
table,
dataset=None,
project=None,
schema=None,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
kms_key=None,
batch_size=None,
max_file_size=None,
max_partition_size=None,
max_files_per_bundle=None,
test_client=None,
custom_gcs_temp_location=None,
method=None,
insert_retry_strategy=None,
additional_bq_parameters=None,
table_side_inputs=None,
schema_side_inputs=None,
triggering_frequency=None,
use_at_least_once=False,
validate=True,
temp_file_format=None,
ignore_insert_ids=False,
# TODO(https://github.com/apache/beam/issues/20712): Switch the default
# when the feature is mature.
with_auto_sharding=False,
num_storage_api_streams=0,
ignore_unknown_columns=False,
load_job_project_id=None,
max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
use_cdc_writes: bool = False,
primary_key: List[str] = None,
expansion_service=None):
"""Initialize a WriteToBigQuery transform.
Args:
table (str, callable, ValueProvider): The ID of the table, or a callable
that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
numbers ``0-9``, or connectors ``-_``. If dataset argument is
:data:`None` then the table argument must contain the entire table
reference specified as: ``'DATASET.TABLE'``
or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
argument representing an element to be written to BigQuery, and return
a TableReference, or a string table name as specified above.
dataset (str): The ID of the dataset containing this table or
:data:`None` if the table reference is specified entirely by the table
argument.
project (str): The ID of the project containing this table or
:data:`None` if the table reference is specified entirely by the table
argument.
schema (str,dict,ValueProvider,callable): The schema to be used if the
BigQuery table to write has to be created. This can be either specified
as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
or a python dictionary, or the string or dictionary itself,
object or a single string of the form
``'field1:type1,field2:type2,field3:type3'`` that defines a comma
separated list of fields. Here ``'type'`` should specify the BigQuery
type of the field. Single string based schemas do not support nested
fields, repeated fields, or specifying a BigQuery mode for fields
(mode will always be set to ``'NULLABLE'``).
If a callable, then it should receive a destination (in the form of
a str, and return a str, dict or TableSchema).
One may also pass ``SCHEMA_AUTODETECT`` here when using JSON-based
file loads, and BigQuery will try to infer the schema for the files
that are being loaded.
create_disposition (BigQueryDisposition): A string describing what
happens if the table does not exist. Possible values are:
* :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
exist.
* :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not
exist.
write_disposition (BigQueryDisposition): A string describing what happens
if the table has already some data. Possible values are:
* :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
* :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
* :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
empty.
For streaming pipelines WriteTruncate can not be used.
kms_key (str): Optional Cloud KMS key name for use when creating new
tables.
batch_size (int): Number of rows to be written to BQ per streaming API
insert. The default is 500.
test_client: Override the default bigquery client used for testing.
max_file_size (int): The maximum size for a file to be written and then
loaded into BigQuery. The default value is 4TB, which is 80% of the
limit of 5TB for BigQuery to load any file.
max_partition_size (int): Maximum byte size for each load job to
BigQuery. Defaults to 15TB. Applicable to FILE_LOADS only.
max_files_per_bundle(int): The maximum number of files to be concurrently
written by a worker. The default here is 20. Larger values will allow
writing to multiple destinations without having to reshard - but they
increase the memory burden on the workers.
custom_gcs_temp_location (str): A GCS location to store files to be used
for file loads into BigQuery. By default, this will use the pipeline's
temp_location, but for pipelines whose temp_location is not appropriate
for BQ File Loads, users should pass a specific one.
method: The method to use to write to BigQuery. It may be
STREAMING_INSERTS, FILE_LOADS, STORAGE_WRITE_API or DEFAULT. An
introduction on loading data to BigQuery:
https://cloud.google.com/bigquery/docs/loading-data.
DEFAULT will use STREAMING_INSERTS on Streaming pipelines and
FILE_LOADS on Batch pipelines.
Note: FILE_LOADS currently does not support BigQuery's JSON data type:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type">
insert_retry_strategy: The strategy to use when retrying streaming inserts
into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
Default is to retry always. This means that whenever there are rows
that fail to be inserted to BigQuery, they will be retried indefinitely.
Other retry strategy settings will produce a deadletter PCollection
as output. Appropriate values are:
* `RetryStrategy.RETRY_ALWAYS`: retry all rows if
there are any kind of errors. Note that this will hold your pipeline
back if there are errors until you cancel or update it.
* `RetryStrategy.RETRY_NEVER`: rows with errors
will not be retried. Instead they will be output to a dead letter
queue under the `'FailedRows'` tag.
* `RetryStrategy.RETRY_ON_TRANSIENT_ERROR`: retry
rows with transient errors (e.g. timeouts). Rows with permanent errors
will be output to dead letter queue under `'FailedRows'` tag.
additional_bq_parameters (dict, callable): Additional parameters to pass
to BQ when creating / loading data into a table. If a callable, it
should be a function that receives a table reference indicating
the destination and returns a dictionary.
These can be 'timePartitioning', 'clustering', etc. They are passed
directly to the job load configuration. See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
table_side_inputs (tuple): A tuple with ``AsSideInput`` PCollections to be
passed to the table callable (if one is provided).
schema_side_inputs: A tuple with ``AsSideInput`` PCollections to be
passed to the schema callable (if one is provided).
triggering_frequency (float):
When method is FILE_LOADS:
Value will be converted to int. Every triggering_frequency seconds, a
BigQuery load job will be triggered for all the data written since the
last load job. BigQuery has limits on how many load jobs can be
triggered per day, so be careful not to set this duration too low, or
you may exceed daily quota. Often this is set to 5 or 10 minutes to
ensure that the project stays well under the BigQuery quota. See
https://cloud.google.com/bigquery/quota-policy for more information
about BigQuery quotas.
When method is STREAMING_INSERTS and with_auto_sharding=True:
A streaming inserts batch will be submitted at least every
triggering_frequency seconds when data is waiting. The batch can be
sent earlier if it reaches the maximum batch size set by batch_size.
Default value is 0.2 seconds.
When method is STORAGE_WRITE_API:
A stream of rows will be committed every triggering_frequency seconds.
By default, this will be 5 seconds to ensure exactly-once semantics.
use_at_least_once: Intended only for STORAGE_WRITE_API. When True, will
use at-least-once semantics. This is cheaper and provides lower
latency, but will potentially duplicate records.
validate: Indicates whether to perform validation checks on
inputs. This parameter is primarily used for testing.
temp_file_format: The format to use for file loads into BigQuery. The
options are NEWLINE_DELIMITED_JSON or AVRO, with NEWLINE_DELIMITED_JSON
being used by default. For advantages and limitations of the two
formats, see
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
and
https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json.
ignore_insert_ids: When using the STREAMING_INSERTS method to write data
to BigQuery, `insert_ids` are a feature of BigQuery that support
deduplication of events. If your use case is not sensitive to
duplication of data inserted to BigQuery, set `ignore_insert_ids`
to True to increase the throughput for BQ writing. See:
https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication
with_auto_sharding: Experimental. If true, enables using a dynamically
determined number of shards to write to BigQuery. This can be used for
all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
applicable to unbounded input.
num_storage_api_streams: Specifies the number of write streams that the
Storage API sink will use. This parameter is only applicable when
writing unbounded data.
ignore_unknown_columns: Accept rows that contain values that do not match
the schema. The unknown values are ignored. Default is False,
which treats unknown values as errors. This option is only valid for
method=STREAMING_INSERTS. See reference:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
load_job_project_id: Specifies an alternate GCP project id to use for
billingBatch File Loads. By default, the project id of the table is
used.
num_streaming_keys: The number of shards per destination when writing via
streaming inserts.
expansion_service: The address (host:port) of the expansion service.
If no expansion service is provided, will attempt to run the default
GCP expansion service. Used for STORAGE_WRITE_API method.
max_insert_payload_size: The maximum byte size for a BigQuery legacy
streaming insert payload.
use_cdc_writes: Configure the usage of CDC writes on BigQuery.
The argument can be used by passing True and the Beam Rows will be
sent as they are to the BigQuery sink which expects a 'record'
and 'row_mutation_info' properties.
Used for STORAGE_WRITE_API, working on 'at least once' mode.
primary_key: When using CDC write on BigQuery and
CREATE_IF_NEEDED mode for the underlying tables a list of column names
is required to be configured as the primary key. Used for
STORAGE_WRITE_API, working on 'at least once' mode.
"""
self._table = table
self._dataset = dataset
self._project = project
self.table_reference = bigquery_tools.parse_table_reference(
table, dataset, project)
self.create_disposition = BigQueryDisposition.validate_create(
create_disposition)
self.write_disposition = BigQueryDisposition.validate_write(
write_disposition)
if schema == SCHEMA_AUTODETECT:
self.schema = schema
else:
self.schema = bigquery_tools.get_dict_table_schema(schema)
self.batch_size = batch_size
self.kms_key = kms_key
self.test_client = test_client
# TODO(pabloem): Consider handling ValueProvider for this location.
self.custom_gcs_temp_location = custom_gcs_temp_location
self.max_file_size = max_file_size
self.max_partition_size = max_partition_size
self.max_files_per_bundle = max_files_per_bundle
self.method = method or WriteToBigQuery.Method.DEFAULT
self.triggering_frequency = triggering_frequency
self.use_at_least_once = use_at_least_once
self.expansion_service = expansion_service
self.with_auto_sharding = with_auto_sharding
self._num_storage_api_streams = num_storage_api_streams
self.insert_retry_strategy = insert_retry_strategy
self._validate = validate
self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.JSON
self.additional_bq_parameters = additional_bq_parameters or {}
self.table_side_inputs = table_side_inputs or ()
self.schema_side_inputs = schema_side_inputs or ()
self._ignore_insert_ids = ignore_insert_ids
self._ignore_unknown_columns = ignore_unknown_columns
self.load_job_project_id = load_job_project_id
self._max_insert_payload_size = max_insert_payload_size
self._num_streaming_keys = num_streaming_keys
self._use_cdc_writes = use_cdc_writes
self._primary_key = primary_key