def with_upstream_dependencies()

in bigquery_etl/query_scheduling/task.py [0:0]


    def with_upstream_dependencies(self, dag_collection):
        """Perform a dry_run to get upstream dependencies."""
        if self.upstream_dependencies:
            return

        dependencies = []

        def _duplicate_dependency(task_ref):
            return any(
                d.task_key == task_ref.task_key for d in self.depends_on + dependencies
            )

        parent_task = None
        if self.is_dq_check or self.is_bigeye_check:
            parent_task = dag_collection.task_for_table(
                self.project, self.dataset, f"{self.table}_{self.version}"
            )
            parent_task_ref = parent_task.to_ref(dag_collection)
            if not _duplicate_dependency(parent_task_ref):
                dependencies.append(parent_task_ref)

        for table in self._get_referenced_tables():
            # check if upstream task is accompanied by a check
            # the task running the check will be set as the upstream task instead
            checks_upstream_task = dag_collection.fail_checks_task_for_table(
                table[0], table[1], table[2]
            )
            bigeye_checks_upstream_task = (
                dag_collection.fail_bigeye_checks_task_for_table(
                    table[0], table[1], table[2]
                )
            )
            upstream_task = dag_collection.task_for_table(table[0], table[1], table[2])

            if upstream_task is not None:
                if upstream_task != self and upstream_task != parent_task:
                    if checks_upstream_task is not None:
                        upstream_task = checks_upstream_task

                    if bigeye_checks_upstream_task is not None:
                        upstream_task = bigeye_checks_upstream_task

                    task_ref = upstream_task.to_ref(dag_collection)
                    if not _duplicate_dependency(task_ref):
                        # Get its upstream dependencies so its date_partition_offset gets set.
                        upstream_task.with_upstream_dependencies(dag_collection)
                        task_ref = upstream_task.to_ref(dag_collection)
                        dependencies.append(task_ref)
            else:
                # see if there are some static dependencies
                for task_ref, patterns in EXTERNAL_TASKS.items():
                    if any(fnmatchcase(f"{table[1]}.{table[2]}", p) for p in patterns):
                        if not _duplicate_dependency(task_ref):
                            dependencies.append(task_ref)
                        break  # stop after the first match

        if (
            self.date_partition_parameter is not None
            and self.date_partition_offset is None
        ):
            # adjust submission_date parameter based on whether upstream tasks have
            # date partition offsets
            date_partition_offsets = [
                dependency.date_partition_offset
                for dependency in dependencies
                if dependency.date_partition_offset
            ]

            if len(date_partition_offsets) > 0:
                self.date_partition_offset = min(date_partition_offsets)
                # unset the table_partition_template property if we have an offset
                # as that will be overridden in the template via `destination_table`
                self.table_partition_template = None
                date_partition_offset_task_keys = [
                    dependency.task_key
                    for dependency in dependencies
                    if dependency.date_partition_offset == self.date_partition_offset
                ]
                logging.info(
                    f"Set {self.task_key} date partition offset"
                    f" to {self.date_partition_offset}"
                    f" based on {', '.join(date_partition_offset_task_keys)}."
                )

        self.upstream_dependencies = dependencies