in providers/dbt/cloud/src/airflow/providers/dbt/cloud/operators/dbt.py [0:0]
def execute(self, context: Context):
if self.trigger_reason is None:
self.trigger_reason = (
f"Triggered via Apache Airflow by task {self.task_id!r} in the {self.dag.dag_id} DAG."
)
if self.job_id is None:
if not all([self.project_name, self.environment_name, self.job_name]):
raise ValueError(
"Either job_id or project_name, environment_name, and job_name must be provided."
)
self.job_id = self.hook.get_job_by_name(
account_id=self.account_id,
project_name=self.project_name,
environment_name=self.environment_name,
job_name=self.job_name,
)["id"]
non_terminal_runs = None
if self.reuse_existing_run:
non_terminal_runs = self.hook.get_job_runs(
account_id=self.account_id,
payload={
"job_definition_id": self.job_id,
"status__in": str(list(DbtCloudJobRunStatus.NON_TERMINAL_STATUSES.value)),
"order_by": "-created_at",
},
).json()["data"]
if non_terminal_runs:
self.run_id = non_terminal_runs[0]["id"]
job_run_url = non_terminal_runs[0]["href"]
is_retry = context["ti"].try_number != 1
if not self.reuse_existing_run or not non_terminal_runs:
trigger_job_response = self.hook.trigger_job_run(
account_id=self.account_id,
job_id=self.job_id,
cause=self.trigger_reason,
steps_override=self.steps_override,
schema_override=self.schema_override,
retry_from_failure=is_retry and self.retry_from_failure,
additional_run_config=self.additional_run_config,
)
self.run_id = trigger_job_response.json()["data"]["id"]
job_run_url = trigger_job_response.json()["data"]["href"]
# Push the ``job_run_url`` value to XCom regardless of what happens during execution so that the job
# run can be monitored via the operator link.
context["ti"].xcom_push(key="job_run_url", value=job_run_url)
if self.wait_for_termination and isinstance(self.run_id, int):
if self.deferrable is False:
self.log.info("Waiting for job run %s to terminate.", self.run_id)
if self.hook.wait_for_job_run_status(
run_id=self.run_id,
account_id=self.account_id,
expected_statuses=DbtCloudJobRunStatus.SUCCESS.value,
check_interval=self.check_interval,
timeout=self.timeout,
):
self.log.info("Job run %s has completed successfully.", self.run_id)
else:
raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.")
return self.run_id
end_time = time.time() + self.timeout
job_run_info = JobRunInfo(account_id=self.account_id, run_id=self.run_id)
job_run_status = self.hook.get_job_run_status(**job_run_info)
if not DbtCloudJobRunStatus.is_terminal(job_run_status):
self.defer(
timeout=self.execution_timeout,
trigger=DbtCloudRunJobTrigger(
conn_id=self.dbt_cloud_conn_id,
run_id=self.run_id,
end_time=end_time,
account_id=self.account_id,
poll_interval=self.check_interval,
),
method_name="execute_complete",
)
elif job_run_status == DbtCloudJobRunStatus.SUCCESS.value:
self.log.info("Job run %s has completed successfully.", self.run_id)
return self.run_id
elif job_run_status in (
DbtCloudJobRunStatus.CANCELLED.value,
DbtCloudJobRunStatus.ERROR.value,
):
raise DbtCloudJobRunException(f"Job run {self.run_id} has failed or has been cancelled.")
else:
if self.deferrable is True:
warnings.warn(
"Argument `wait_for_termination` is False and `deferrable` is True , hence "
"`deferrable` parameter doesn't have any effect",
UserWarning,
stacklevel=2,
)
return self.run_id