dbt_jobs_view.sql (110 lines of code) (raw):
-- Copyright 2023 Google LLC
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
WITH
-- Group by dbt_invocation_id and extract any non-null dbt_payload --
-- this gives us dbt_payload information which should be there once per DBT
-- invocation (From the on run hook)
SrcData AS (
SELECT
dbt.dbt_invocation_id,
job.project_id,
job.location,
ARRAY_AGG(dbt_payload IGNORE NULLS LIMIT 1)[SAFE_OFFSET(0)] AS dbt_payload,
MIN(job_stats.create_time) AS create_time,
MAX(job_stats.end_time) AS end_time
FROM
`${monitoring_dataset}.${job_table}` j
WHERE
dbt.dbt_invocation_id IS NOT NULL
GROUP BY
1, 2, 3
),
-- Extract key attributes from the dbt_payload. Note that any standard
-- variables can be added to this extraction in the future.
ExpandedValues AS (
SELECT
dbt_invocation_id,
project_id,
location,
STRUCT(
-- Airflow execution context (if launched from Airflow)
STRUCT(
JSON_VALUE(dbt_payload, "$.env.AIRFLOW_BASE_URL") AS base_url,
JSON_VALUE(dbt_payload, "$.env.AIRFLOW_CTX_DAG_ID") AS dag_id,
REPLACE(
REPLACE(
JSON_VALUE(dbt_payload, "$.env.AIRFLOW_CTX_EXECUTION_DATE"), ":", "%3A"),
"+", "%2B") AS execution_date,
JSON_VALUE(dbt_payload, "$.env.AIRFLOW_CTX_TASK_ID") AS task_id
) AS airflow,
-- Standard airflow provided values (if launched from Airflow)
STRUCT(
JSON_VALUE(dbt_payload, "$.env.PROJECT_ID") AS project_id,
JSON_VALUE(dbt_payload, "$.env.REGION") AS region,
JSON_VALUE(dbt_payload, "$.env.BQ_LOCATION") AS bq_location,
JSON_VALUE(dbt_payload, "$.env.GCS_DOCS_BUCKET") AS gcs_docs_bucket
) AS run_context,
-- Build provided values (if built with example Dockerfile)
STRUCT(
JSON_VALUE(dbt_payload, "$.env.BUILD_LOCATION") AS build_location,
JSON_VALUE(dbt_payload, "$.env.BUILD_REF") AS build_ref,
JSON_VALUE(dbt_payload, "$.env.SOURCE_URL") AS source_url,
JSON_VALUE(dbt_payload, "$.env.SOURCE_REF") AS source_ref,
JSON_VALUE(dbt_payload, "$.env.SOURCE_PATH") AS source_path
) AS build,
-- General parameters (for downstream extraction)
JSON_VALUE(dbt_payload, "$.project") AS dbt_project,
JSON_QUERY(dbt_payload, "$.env") AS env,
JSON_QUERY(dbt_payload, "$.args") AS args
) AS dbt_invocation,
-- Build some dbt_stats
STRUCT(
create_time,
end_time,
TIMESTAMP_DIFF(end_time, create_time, SECOND) AS elapsed_secs
) AS dbt_stats
FROM
SrcData
),
-- Generate links from the data and join with the base BigQuery job information
WithLinks AS (
SELECT
dbt_invocation_id,
project_id,
location,
dbt_invocation,
dbt_stats,
-- DBT-specific links
STRUCT(
-- Airflow Dag link from execution
CONCAT(dbt_invocation.airflow.base_url, "/graph?dag_id=", dbt_invocation.airflow.dag_id,
"&execution_date=", dbt_invocation.airflow.execution_date) AS airflow_dag_link,
-- Airflow Task Info link from execution
CONCAT(dbt_invocation.airflow.base_url, "/log?dag_id=", dbt_invocation.airflow.dag_id,
"&task_id=", dbt_invocation.airflow.task_id,
"&execution_date=", dbt_invocation.airflow.execution_date) AS airflow_logs_link,
-- Airflow Task Log link from execution
CONCAT(dbt_invocation.airflow.base_url, "/task?dag_id=", dbt_invocation.airflow.dag_id,
"&task_id=", dbt_invocation.airflow.task_id,
"&execution_date=", dbt_invocation.airflow.execution_date) AS airflow_task_link,
-- Browse the GCS bucket for this execution (target and logs)
CONCAT("https://console.cloud.google.com/storage/browser/",
dbt_invocation.run_context.gcs_docs_bucket, "/",
dbt_invocation.airflow.dag_id, "/",
dbt_invocation.airflow.task_id, "/",
dbt_invocation.airflow.execution_date, "/dbt") AS dbt_archive,
-- Provided generated documentation (if saved and converted to staic_index.html --
-- see the example_dbt job)
CONCAT("https://storage.cloud.google.com/",
dbt_invocation.run_context.gcs_docs_bucket, "/",
dbt_invocation.airflow.dag_id, "/",
dbt_invocation.airflow.task_id, "/",
dbt_invocation.airflow.execution_date,
"/dbt/target/static_index.html") AS dbt_docs,
-- Provided generated documentation, as above, but pulling out the specific model
-- as per this job (if one provided)
CONCAT("https://storage.cloud.google.com/",
dbt_invocation.run_context.gcs_docs_bucket, "/",
dbt_invocation.airflow.dag_id, "/",
dbt_invocation.airflow.task_id, "/",
dbt_invocation.airflow.execution_date,
"/dbt/target/static_index.html",
"#!/model/", dbt.node_id) AS dbt_model_docs,
-- Cloud build link
CONCAT("https://console.cloud.google.com/cloud-build/builds",
";region=", dbt_invocation.build.build_location,
"/", dbt_invocation.build.build_ref) AS build_link,
-- Source link if provided)
CONCAT(dbt_invocation.build.source_url,
dbt_invocation.build.source_ref,
"/", dbt_invocation.build.source_path) AS src_link
) AS dbt_links,
j.* EXCEPT (dbt_payload)
FROM
`${monitoring_dataset}.bigquery_jobs` j
JOIN ExpandedValues ON (
j.job.project_id=project_id
AND j.job.location=location
AND j.dbt.dbt_invocation_id=dbt_invocation_id
)
)
SELECT
*
FROM
WithLinks