def execute()

in providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py [0:0]


    def execute(self, context: Context):
        if self.trigger_reason is None:
            self.trigger_reason = (
                f"Triggered via Apache Airflow by task {self.task_id!r} in the {self.dag.dag_id} DAG."
            )

        if self.job_id is None:
            if not all([self.project_name, self.environment_name, self.job_name]):
                raise ValueError(
                    "Either job_id or project_name, environment_name, and job_name must be provided."
                )
            self.job_id = self.hook.get_job_by_name(
                account_id=self.account_id,
                project_name=self.project_name,
                environment_name=self.environment_name,
                job_name=self.job_name,
            )["id"]

        non_terminal_runs = None
        if self.reuse_existing_run:
            non_terminal_runs = self.hook.get_job_runs(
                account_id=self.account_id,
                payload={
                    "job_definition_id": self.job_id,
                    "status__in": str(list(DbtCloudJobRunStatus.NON_TERMINAL_STATUSES.value)),
                    "order_by": "-created_at",
                },
            ).json()["data"]
            if non_terminal_runs:
                self.run_id = non_terminal_runs[0]["id"]
                job_run_url = non_terminal_runs[0]["href"]

        is_retry = context["ti"].try_number != 1

        if not self.reuse_existing_run or not non_terminal_runs:
            trigger_job_response = self.hook.trigger_job_run(
                account_id=self.account_id,
                job_id=self.job_id,
                cause=self.trigger_reason,
                steps_override=self.steps_override,
                schema_override=self.schema_override,
                retry_from_failure=is_retry and self.retry_from_failure,
                additional_run_config=self.additional_run_config,
            )
            self.run_id = trigger_job_response.json()["data"]["id"]
            job_run_url = trigger_job_response.json()["data"]["href"]

        # Push the ``job_run_url`` value to XCom regardless of what happens during execution so that the job
        # run can be monitored via the operator link.
        context["ti"].xcom_push(key="job_run_url", value=job_run_url)

        if self.wait_for_termination and isinstance(self.run_id, int):
            if self.deferrable is False:
                self.log.info("Waiting for job run %s to terminate.", self.run_id)

                if self.hook.wait_for_job_run_status(
                    run_id=self.run_id,
                    account_id=self.account_id,
                    expected_statuses=DbtCloudJobRunStatus.SUCCESS.value,
                    check_interval=self.check_interval,
                    timeout=self.timeout,
                ):
                    self.log.info("Job run %s has completed successfully.", self.run_id)
                else:
                    raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.")

                return self.run_id
            end_time = time.time() + self.timeout
            job_run_info = JobRunInfo(account_id=self.account_id, run_id=self.run_id)
            job_run_status = self.hook.get_job_run_status(**job_run_info)
            if not DbtCloudJobRunStatus.is_terminal(job_run_status):
                self.defer(
                    timeout=self.execution_timeout,
                    trigger=DbtCloudRunJobTrigger(
                        conn_id=self.dbt_cloud_conn_id,
                        run_id=self.run_id,
                        end_time=end_time,
                        account_id=self.account_id,
                        poll_interval=self.check_interval,
                    ),
                    method_name="execute_complete",
                )
            elif job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
                self.log.info("Job run %s has completed successfully.", self.run_id)
                return self.run_id
            elif job_run_status in (
                DbtCloudJobRunStatus.CANCELLED.value,
                DbtCloudJobRunStatus.ERROR.value,
            ):
                raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.")
        else:
            if self.deferrable is True:
                warnings.warn(
                    "Argument `wait_for_termination` is False and `deferrable` is True , hence "
                    "`deferrable` parameter doesn't have any effect",
                    UserWarning,
                    stacklevel=2,
                )
            return self.run_id