in bigquery_etl/query_scheduling/task.py [0:0]
def with_upstream_dependencies(self, dag_collection):
"""Perform a dry_run to get upstream dependencies."""
if self.upstream_dependencies:
return
dependencies = []
def _duplicate_dependency(task_ref):
return any(
d.task_key == task_ref.task_key for d in self.depends_on + dependencies
)
parent_task = None
if self.is_dq_check or self.is_bigeye_check:
parent_task = dag_collection.task_for_table(
self.project, self.dataset, f"{self.table}_{self.version}"
)
parent_task_ref = parent_task.to_ref(dag_collection)
if not _duplicate_dependency(parent_task_ref):
dependencies.append(parent_task_ref)
for table in self._get_referenced_tables():
# check if upstream task is accompanied by a check
# the task running the check will be set as the upstream task instead
checks_upstream_task = dag_collection.fail_checks_task_for_table(
table[0], table[1], table[2]
)
bigeye_checks_upstream_task = (
dag_collection.fail_bigeye_checks_task_for_table(
table[0], table[1], table[2]
)
)
upstream_task = dag_collection.task_for_table(table[0], table[1], table[2])
if upstream_task is not None:
if upstream_task != self and upstream_task != parent_task:
if checks_upstream_task is not None:
upstream_task = checks_upstream_task
if bigeye_checks_upstream_task is not None:
upstream_task = bigeye_checks_upstream_task
task_ref = upstream_task.to_ref(dag_collection)
if not _duplicate_dependency(task_ref):
# Get its upstream dependencies so its date_partition_offset gets set.
upstream_task.with_upstream_dependencies(dag_collection)
task_ref = upstream_task.to_ref(dag_collection)
dependencies.append(task_ref)
else:
# see if there are some static dependencies
for task_ref, patterns in EXTERNAL_TASKS.items():
if any(fnmatchcase(f"{table[1]}.{table[2]}", p) for p in patterns):
if not _duplicate_dependency(task_ref):
dependencies.append(task_ref)
break # stop after the first match
if (
self.date_partition_parameter is not None
and self.date_partition_offset is None
):
# adjust submission_date parameter based on whether upstream tasks have
# date partition offsets
date_partition_offsets = [
dependency.date_partition_offset
for dependency in dependencies
if dependency.date_partition_offset
]
if len(date_partition_offsets) > 0:
self.date_partition_offset = min(date_partition_offsets)
# unset the table_partition_template property if we have an offset
# as that will be overridden in the template via `destination_table`
self.table_partition_template = None
date_partition_offset_task_keys = [
dependency.task_key
for dependency in dependencies
if dependency.date_partition_offset == self.date_partition_offset
]
logging.info(
f"Set {self.task_key} date partition offset"
f" to {self.date_partition_offset}"
f" based on {', '.join(date_partition_offset_task_keys)}."
)
self.upstream_dependencies = dependencies