in providers/google/src/airflow/providers/google/cloud/transfers/gcs_to_bigquery.py [0:0]
def _use_existing_table(self):
destination_project_id, destination_dataset, destination_table = self.hook.split_tablename(
table_input=self.destination_project_dataset_table,
default_project_id=self.hook.project_id,
var_name="destination_project_dataset_table",
)
# bigquery also allows you to define how you want a table's schema to change
# as a side effect of a load
# for more details:
# https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions
allowed_schema_update_options = ["ALLOW_FIELD_ADDITION", "ALLOW_FIELD_RELAXATION"]
if not set(allowed_schema_update_options).issuperset(set(self.schema_update_options)):
raise ValueError(
f"{self.schema_update_options} contains invalid schema update options. "
f"Please only use one or more of the following options: {allowed_schema_update_options}"
)
self.configuration = {
"load": {
"autodetect": self.autodetect,
"createDisposition": self.create_disposition,
"destinationTable": {
"projectId": destination_project_id,
"datasetId": destination_dataset,
"tableId": destination_table,
},
"sourceFormat": self.source_format,
"sourceUris": self.source_uris,
"writeDisposition": self.write_disposition,
"ignoreUnknownValues": self.ignore_unknown_values,
},
}
self.time_partitioning = self._cleanse_time_partitioning(
self.destination_project_dataset_table, self.time_partitioning
)
if self.time_partitioning:
self.configuration["load"].update({"timePartitioning": self.time_partitioning})
if self.cluster_fields:
self.configuration["load"].update({"clustering": {"fields": self.cluster_fields}})
if self.schema_fields:
self.configuration["load"]["schema"] = {"fields": self.schema_fields}
if self.schema_update_options:
if self.write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:
raise ValueError(
"schema_update_options is only "
"allowed if write_disposition is "
"'WRITE_APPEND' or 'WRITE_TRUNCATE'."
)
# To provide backward compatibility
self.schema_update_options = list(self.schema_update_options or [])
self.log.info("Adding experimental 'schemaUpdateOptions': %s", self.schema_update_options)
self.configuration["load"]["schemaUpdateOptions"] = self.schema_update_options
if self.max_bad_records:
self.configuration["load"]["maxBadRecords"] = self.max_bad_records
if self.encryption_configuration:
self.configuration["load"]["destinationEncryptionConfiguration"] = self.encryption_configuration
if self.labels or self.description:
self.configuration["load"].update({"destinationTableProperties": {}})
if self.labels:
self.configuration["load"]["destinationTableProperties"]["labels"] = self.labels
if self.description:
self.configuration["load"]["destinationTableProperties"]["description"] = self.description
src_fmt_to_configs_mapping = {
"CSV": [
"allowJaggedRows",
"allowQuotedNewlines",
"autodetect",
"fieldDelimiter",
"skipLeadingRows",
"ignoreUnknownValues",
"nullMarker",
"quote",
"encoding",
"preserveAsciiControlCharacters",
"columnNameCharacterMap",
],
"DATASTORE_BACKUP": ["projectionFields"],
"NEWLINE_DELIMITED_JSON": ["autodetect", "ignoreUnknownValues"],
"PARQUET": ["autodetect", "ignoreUnknownValues"],
"AVRO": ["useAvroLogicalTypes"],
"ORC": ["autodetect"],
}
valid_configs = src_fmt_to_configs_mapping[self.source_format]
# if following fields are not specified in src_fmt_configs,
# honor the top-level params for backward-compatibility
backward_compatibility_configs = {
"skipLeadingRows": self.skip_leading_rows,
"fieldDelimiter": self.field_delimiter,
"ignoreUnknownValues": self.ignore_unknown_values,
"quote": self.quote_character,
"allowQuotedNewlines": self.allow_quoted_newlines,
"encoding": self.encoding,
}
self.src_fmt_configs = self._validate_src_fmt_configs(
self.source_format, self.src_fmt_configs, valid_configs, backward_compatibility_configs
)
self.configuration["load"].update(self.src_fmt_configs)
if self.allow_jagged_rows:
self.configuration["load"]["allowJaggedRows"] = self.allow_jagged_rows
return self.configuration