in bigquery_etl/view/__init__.py [0:0]
def publish(self, target_project=None, dry_run=False, client=None):
"""
Publish this view to BigQuery.
If `target_project` is set, it will replace the project ID in the view definition.
"""
if any(str(self.path).endswith(p) for p in self.skip_publish()):
print(f"Skipping {self.path}")
return True
# avoid checking references since Jenkins might throw an exception:
# https://github.com/mozilla/bigquery-etl/issues/2246
if (
any(str(self.path).endswith(p) for p in self.skip_validation())
or self._valid_view_naming()
):
client = client or bigquery.Client()
sql = self.content
target_view = self.target_view_identifier(target_project)
if target_project:
if self.project != ConfigLoader.get(
"default", "project", fallback="moz-fx-data-shared-prod"
):
print(f"Skipping {self.path} because --target-project is set")
return True
# We only change the first occurrence, which is in the target view name.
sql = re.sub(
rf"^(?!--)(.*){self.project}",
rf"\1{target_project}",
sql,
count=1,
flags=re.MULTILINE,
)
job_config = bigquery.QueryJobConfig(use_legacy_sql=False, dry_run=dry_run)
query_job = client.query(sql, job_config)
if dry_run:
print(f"Validated definition of {target_view} in {self.path}")
else:
try:
job_id = query_job.result().job_id
except BadRequest as e:
if "Invalid snapshot time" in e.message:
# This occasionally happens due to dependent views being
# published concurrently; we wait briefly and give it one
# extra try in this situation.
time.sleep(1)
job_id = client.query(sql, job_config).result().job_id
else:
raise
try:
table = client.get_table(target_view)
except NotFound:
print(
f"{target_view} failed to publish to the correct location, verify job id {job_id}",
file=sys.stderr,
)
return False
try:
if self.schema_path.is_file():
table = self.schema.deploy(target_view)
except Exception as e:
print(f"Could not update field descriptions for {target_view}: {e}")
if not self.metadata:
print(f"Missing metadata for {self.path}")
table.description = self.metadata.description
table.friendly_name = self.metadata.friendly_name
if table.labels != self.labels:
labels = self.labels.copy()
for key in table.labels:
if key not in labels:
# To delete a label its value must be set to None
labels[key] = None
table.labels = {
key: value
for key, value in labels.items()
if isinstance(value, str)
}
client.update_table(
table, ["labels", "description", "friendly_name"]
)
else:
client.update_table(table, ["description", "friendly_name"])
print(f"Published view {target_view}")
else:
print(
f"Error publishing {self.path}. Invalid view definition.",
file=sys.stderr,
)
return False
return True