def _use_existing_table()

in providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py [0:0]


    def _use_existing_table(self):
        destination_project_id, destination_dataset, destination_table = self.hook.split_tablename(
            table_input=self.destination_project_dataset_table,
            default_project_id=self.hook.project_id,
            var_name="destination_project_dataset_table",
        )

        # bigquery also allows you to define how you want a table's schema to change
        # as a side effect of a load
        # for more details:
        # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions
        allowed_schema_update_options = ["ALLOW_FIELD_ADDITION", "ALLOW_FIELD_RELAXATION"]
        if not set(allowed_schema_update_options).issuperset(set(self.schema_update_options)):
            raise ValueError(
                f"{self.schema_update_options} contains invalid schema update options. "
                f"Please only use one or more of the following options: {allowed_schema_update_options}"
            )

        self.configuration = {
            "load": {
                "autodetect": self.autodetect,
                "createDisposition": self.create_disposition,
                "destinationTable": {
                    "projectId": destination_project_id,
                    "datasetId": destination_dataset,
                    "tableId": destination_table,
                },
                "sourceFormat": self.source_format,
                "sourceUris": self.source_uris,
                "writeDisposition": self.write_disposition,
                "ignoreUnknownValues": self.ignore_unknown_values,
            },
        }
        self.time_partitioning = self._cleanse_time_partitioning(
            self.destination_project_dataset_table, self.time_partitioning
        )
        if self.time_partitioning:
            self.configuration["load"].update({"timePartitioning": self.time_partitioning})

        if self.cluster_fields:
            self.configuration["load"].update({"clustering": {"fields": self.cluster_fields}})

        if self.schema_fields:
            self.configuration["load"]["schema"] = {"fields": self.schema_fields}

        if self.schema_update_options:
            if self.write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:
                raise ValueError(
                    "schema_update_options is only "
                    "allowed if write_disposition is "
                    "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
                )
            # To provide backward compatibility
            self.schema_update_options = list(self.schema_update_options or [])
            self.log.info("Adding experimental 'schemaUpdateOptions': %s", self.schema_update_options)
            self.configuration["load"]["schemaUpdateOptions"] = self.schema_update_options

        if self.max_bad_records:
            self.configuration["load"]["maxBadRecords"] = self.max_bad_records

        if self.encryption_configuration:
            self.configuration["load"]["destinationEncryptionConfiguration"] = self.encryption_configuration

        if self.labels or self.description:
            self.configuration["load"].update({"destinationTableProperties": {}})
            if self.labels:
                self.configuration["load"]["destinationTableProperties"]["labels"] = self.labels
            if self.description:
                self.configuration["load"]["destinationTableProperties"]["description"] = self.description

        src_fmt_to_configs_mapping = {
            "CSV": [
                "allowJaggedRows",
                "allowQuotedNewlines",
                "autodetect",
                "fieldDelimiter",
                "skipLeadingRows",
                "ignoreUnknownValues",
                "nullMarker",
                "quote",
                "encoding",
                "preserveAsciiControlCharacters",
                "columnNameCharacterMap",
            ],
            "DATASTORE_BACKUP": ["projectionFields"],
            "NEWLINE_DELIMITED_JSON": ["autodetect", "ignoreUnknownValues"],
            "PARQUET": ["autodetect", "ignoreUnknownValues"],
            "AVRO": ["useAvroLogicalTypes"],
            "ORC": ["autodetect"],
        }

        valid_configs = src_fmt_to_configs_mapping[self.source_format]

        # if following fields are not specified in src_fmt_configs,
        # honor the top-level params for backward-compatibility
        backward_compatibility_configs = {
            "skipLeadingRows": self.skip_leading_rows,
            "fieldDelimiter": self.field_delimiter,
            "ignoreUnknownValues": self.ignore_unknown_values,
            "quote": self.quote_character,
            "allowQuotedNewlines": self.allow_quoted_newlines,
            "encoding": self.encoding,
        }

        self.src_fmt_configs = self._validate_src_fmt_configs(
            self.source_format, self.src_fmt_configs, valid_configs, backward_compatibility_configs
        )

        self.configuration["load"].update(self.src_fmt_configs)

        if self.allow_jagged_rows:
            self.configuration["load"]["allowJaggedRows"] = self.allow_jagged_rows
        return self.configuration