def __init__()

in sdks/python/apache_beam/io/gcp/bigquery.py [0:0]


  def __init__(
      self,
      table,
      dataset=None,
      project=None,
      schema=None,
      create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
      write_disposition=BigQueryDisposition.WRITE_APPEND,
      kms_key=None,
      batch_size=None,
      max_file_size=None,
      max_partition_size=None,
      max_files_per_bundle=None,
      test_client=None,
      custom_gcs_temp_location=None,
      method=None,
      insert_retry_strategy=None,
      additional_bq_parameters=None,
      table_side_inputs=None,
      schema_side_inputs=None,
      triggering_frequency=None,
      use_at_least_once=False,
      validate=True,
      temp_file_format=None,
      ignore_insert_ids=False,
      # TODO(https://github.com/apache/beam/issues/20712): Switch the default
      # when the feature is mature.
      with_auto_sharding=False,
      num_storage_api_streams=0,
      ignore_unknown_columns=False,
      load_job_project_id=None,
      max_insert_payload_size=MAX_INSERT_PAYLOAD_SIZE,
      num_streaming_keys=DEFAULT_SHARDS_PER_DESTINATION,
      use_cdc_writes: bool = False,
      primary_key: List[str] = None,
      expansion_service=None):
    """Initialize a WriteToBigQuery transform.

    Args:
      table (str, callable, ValueProvider): The ID of the table, or a callable
         that returns it. The ID must contain only letters ``a-z``, ``A-Z``,
         numbers ``0-9``, or connectors ``-_``. If dataset argument is
         :data:`None` then the table argument must contain the entire table
         reference specified as: ``'DATASET.TABLE'``
         or ``'PROJECT:DATASET.TABLE'``. If it's a callable, it must receive one
         argument representing an element to be written to BigQuery, and return
         a TableReference, or a string table name as specified above.
      dataset (str): The ID of the dataset containing this table or
        :data:`None` if the table reference is specified entirely by the table
        argument.
      project (str): The ID of the project containing this table or
        :data:`None` if the table reference is specified entirely by the table
        argument.
      schema (str,dict,ValueProvider,callable): The schema to be used if the
        BigQuery table to write has to be created. This can be either specified
        as a :class:`~apache_beam.io.gcp.internal.clients.bigquery.\
bigquery_v2_messages.TableSchema`. or a `ValueProvider` that has a JSON string,
        or a python dictionary, or the string or dictionary itself,
        object or a single string  of the form
        ``'field1:type1,field2:type2,field3:type3'`` that defines a comma
        separated list of fields. Here ``'type'`` should specify the BigQuery
        type of the field. Single string based schemas do not support nested
        fields, repeated fields, or specifying a BigQuery mode for fields
        (mode will always be set to ``'NULLABLE'``).
        If a callable, then it should receive a destination (in the form of
        a str, and return a str, dict or TableSchema).
        One may also pass ``SCHEMA_AUTODETECT`` here when using JSON-based
        file loads, and BigQuery will try to infer the schema for the files
        that are being loaded.
      create_disposition (BigQueryDisposition): A string describing what
        happens if the table does not exist. Possible values are:

        * :attr:`BigQueryDisposition.CREATE_IF_NEEDED`: create if does not
          exist.
        * :attr:`BigQueryDisposition.CREATE_NEVER`: fail the write if does not
          exist.

      write_disposition (BigQueryDisposition): A string describing what happens
        if the table has already some data. Possible values are:

        * :attr:`BigQueryDisposition.WRITE_TRUNCATE`: delete existing rows.
        * :attr:`BigQueryDisposition.WRITE_APPEND`: add to existing rows.
        * :attr:`BigQueryDisposition.WRITE_EMPTY`: fail the write if table not
          empty.

        For streaming pipelines WriteTruncate can not be used.
      kms_key (str): Optional Cloud KMS key name for use when creating new
        tables.
      batch_size (int): Number of rows to be written to BQ per streaming API
        insert. The default is 500.
      test_client: Override the default bigquery client used for testing.
      max_file_size (int): The maximum size for a file to be written and then
        loaded into BigQuery. The default value is 4TB, which is 80% of the
        limit of 5TB for BigQuery to load any file.
      max_partition_size (int): Maximum byte size for each load job to
        BigQuery. Defaults to 15TB. Applicable to FILE_LOADS only.
      max_files_per_bundle(int): The maximum number of files to be concurrently
        written by a worker. The default here is 20. Larger values will allow
        writing to multiple destinations without having to reshard - but they
        increase the memory burden on the workers.
      custom_gcs_temp_location (str): A GCS location to store files to be used
        for file loads into BigQuery. By default, this will use the pipeline's
        temp_location, but for pipelines whose temp_location is not appropriate
        for BQ File Loads, users should pass a specific one.
      method: The method to use to write to BigQuery. It may be
        STREAMING_INSERTS, FILE_LOADS, STORAGE_WRITE_API or DEFAULT. An
        introduction on loading data to BigQuery:
        https://cloud.google.com/bigquery/docs/loading-data.
        DEFAULT will use STREAMING_INSERTS on Streaming pipelines and
        FILE_LOADS on Batch pipelines.
        Note: FILE_LOADS currently does not support BigQuery's JSON data type:
        https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type">
      insert_retry_strategy: The strategy to use when retrying streaming inserts
        into BigQuery. Options are shown in bigquery_tools.RetryStrategy attrs.
        Default is to retry always. This means that whenever there are rows
        that fail to be inserted to BigQuery, they will be retried indefinitely.
        Other retry strategy settings will produce a deadletter PCollection
        as output. Appropriate values are:

        * `RetryStrategy.RETRY_ALWAYS`: retry all rows if
          there are any kind of errors. Note that this will hold your pipeline
          back if there are errors until you cancel or update it.
        * `RetryStrategy.RETRY_NEVER`: rows with errors
          will not be retried. Instead they will be output to a dead letter
          queue under the `'FailedRows'` tag.
        * `RetryStrategy.RETRY_ON_TRANSIENT_ERROR`: retry
          rows with transient errors (e.g. timeouts). Rows with permanent errors
          will be output to dead letter queue under `'FailedRows'` tag.

      additional_bq_parameters (dict, callable): Additional parameters to pass
        to BQ when creating / loading data into a table. If a callable, it
        should be a function that receives a table reference indicating
        the destination and returns a dictionary.
        These can be 'timePartitioning', 'clustering', etc. They are passed
        directly to the job load configuration. See
        https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationload
      table_side_inputs (tuple): A tuple with ``AsSideInput`` PCollections to be
        passed to the table callable (if one is provided).
      schema_side_inputs: A tuple with ``AsSideInput`` PCollections to be
        passed to the schema callable (if one is provided).
      triggering_frequency (float):
        When method is FILE_LOADS:
        Value will be converted to int. Every triggering_frequency seconds, a
        BigQuery load job will be triggered for all the data written since the
        last load job. BigQuery has limits on how many load jobs can be
        triggered per day, so be careful not to set this duration too low, or
        you may exceed daily quota. Often this is set to 5 or 10 minutes to
        ensure that the project stays well under the BigQuery quota. See
        https://cloud.google.com/bigquery/quota-policy for more information
        about BigQuery quotas.

        When method is STREAMING_INSERTS and with_auto_sharding=True:
        A streaming inserts batch will be submitted at least every
        triggering_frequency seconds when data is waiting. The batch can be
        sent earlier if it reaches the maximum batch size set by batch_size.
        Default value is 0.2 seconds.

        When method is STORAGE_WRITE_API:
        A stream of rows will be committed every triggering_frequency seconds.
        By default, this will be 5 seconds to ensure exactly-once semantics.
      use_at_least_once: Intended only for STORAGE_WRITE_API. When True, will
        use at-least-once semantics. This is cheaper and provides lower
        latency, but will potentially duplicate records.
      validate: Indicates whether to perform validation checks on
        inputs. This parameter is primarily used for testing.
      temp_file_format: The format to use for file loads into BigQuery. The
        options are NEWLINE_DELIMITED_JSON or AVRO, with NEWLINE_DELIMITED_JSON
        being used by default. For advantages and limitations of the two
        formats, see
        https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-avro
        and
        https://cloud.google.com/bigquery/docs/loading-data-cloud-storage-json.
      ignore_insert_ids: When using the STREAMING_INSERTS method to write data
        to BigQuery, `insert_ids` are a feature of BigQuery that support
        deduplication of events. If your use case is not sensitive to
        duplication of data inserted to BigQuery, set `ignore_insert_ids`
        to True to increase the throughput for BQ writing. See:
        https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication
      with_auto_sharding: Experimental. If true, enables using a dynamically
        determined number of shards to write to BigQuery. This can be used for
        all of FILE_LOADS, STREAMING_INSERTS, and STORAGE_WRITE_API. Only
        applicable to unbounded input.
      num_storage_api_streams: Specifies the number of write streams that the
        Storage API sink will use. This parameter is only applicable when
        writing unbounded data.
      ignore_unknown_columns: Accept rows that contain values that do not match
        the schema. The unknown values are ignored. Default is False,
        which treats unknown values as errors. This option is only valid for
        method=STREAMING_INSERTS. See reference:
        https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll
      load_job_project_id: Specifies an alternate GCP project id to use for
        billingBatch File Loads. By default, the project id of the table is
        used.
      num_streaming_keys: The number of shards per destination when writing via
        streaming inserts.
      expansion_service: The address (host:port) of the expansion service.
        If no expansion service is provided, will attempt to run the default
        GCP expansion service. Used for STORAGE_WRITE_API method.
      max_insert_payload_size: The maximum byte size for a BigQuery legacy
        streaming insert payload.
      use_cdc_writes: Configure the usage of CDC writes on BigQuery.
        The argument can be used by passing True and the Beam Rows will be
        sent as they are to the BigQuery sink which expects a 'record'
        and 'row_mutation_info' properties.
        Used for STORAGE_WRITE_API, working on 'at least once' mode.
      primary_key: When using CDC write on BigQuery and
        CREATE_IF_NEEDED mode for the underlying tables a list of column names
        is required to be configured as the primary key. Used for
        STORAGE_WRITE_API, working on 'at least once' mode.
    """
    self._table = table
    self._dataset = dataset
    self._project = project
    self.table_reference = bigquery_tools.parse_table_reference(
        table, dataset, project)
    self.create_disposition = BigQueryDisposition.validate_create(
        create_disposition)
    self.write_disposition = BigQueryDisposition.validate_write(
        write_disposition)
    if schema == SCHEMA_AUTODETECT:
      self.schema = schema
    else:
      self.schema = bigquery_tools.get_dict_table_schema(schema)
    self.batch_size = batch_size
    self.kms_key = kms_key
    self.test_client = test_client

    # TODO(pabloem): Consider handling ValueProvider for this location.
    self.custom_gcs_temp_location = custom_gcs_temp_location
    self.max_file_size = max_file_size
    self.max_partition_size = max_partition_size
    self.max_files_per_bundle = max_files_per_bundle
    self.method = method or WriteToBigQuery.Method.DEFAULT
    self.triggering_frequency = triggering_frequency
    self.use_at_least_once = use_at_least_once
    self.expansion_service = expansion_service
    self.with_auto_sharding = with_auto_sharding
    self._num_storage_api_streams = num_storage_api_streams
    self.insert_retry_strategy = insert_retry_strategy
    self._validate = validate
    self._temp_file_format = temp_file_format or bigquery_tools.FileFormat.JSON

    self.additional_bq_parameters = additional_bq_parameters or {}
    self.table_side_inputs = table_side_inputs or ()
    self.schema_side_inputs = schema_side_inputs or ()
    self._ignore_insert_ids = ignore_insert_ids
    self._ignore_unknown_columns = ignore_unknown_columns
    self.load_job_project_id = load_job_project_id
    self._max_insert_payload_size = max_insert_payload_size
    self._num_streaming_keys = num_streaming_keys
    self._use_cdc_writes = use_cdc_writes
    self._primary_key = primary_key