common_components/monitoring/dbt_perf.py (75 lines of code) (raw):

#!/usr/bin/python3 # Copyright 2022 The Reg Reporting Blueprint Authors # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # https://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Display recent DBT invocations and extract performance graphs from DBT. This is intended to be used with json_to_dot.py which can convert these performance graphs into a DOT graph for graphviz. """ import json import argparse import os try: from google.cloud import bigquery except ModuleNotFoundError: print('Please install google-cloud-bigquery by pip3') print('For example, running "pip3 install google-cloud-bigquery"') raise SystemExit def dbt_recent_invocations_query(full_table_id, num): """Generate query for extracting DBT recent invocations""" return f""" SELECT dbt_invocation_id, MIN(IF(stage='START', update_time, NULL)) AS start_time, MAX(IF(stage='END', update_time, NULL)) AS end_time FROM `{full_table_id}` GROUP BY 1 ORDER BY start_time DESC LIMIT {num}; """ TIME_FORMAT = '%Y-%m-%d %H:%M:%S' def dbt_recent_invocations(client, project_id, dataset_id, table_id, num): """Display recent DBT invocations""" full_table_id = f'{project_id}.{dataset_id}.{table_id}' results = client.query(dbt_recent_invocations_query(full_table_id, num)) print('dbt_invocation_id start_date end_date') for row in results: end_time = '' if row['end_time']: end_time = row['end_time'].strftime(TIME_FORMAT) print(row['dbt_invocation_id'], row['start_time'].strftime(TIME_FORMAT), end_time) def dbt_invocation_query(full_table_id, dbt_invocation_id): """Generate query for extracting DBT performance JSON graph from DBT log""" return f""" WITH StartStage AS ( SELECT dbt_invocation_id, JSON_EXTRACT_SCALAR(n, "$.id") AS name, JSON_EXTRACT_STRING_ARRAY(n, "$.depends_on") AS depends_on FROM `{full_table_id}` AS l CROSS JOIN UNNEST(JSON_EXTRACT_ARRAY(l.info, "$.tree")) AS n WHERE stage='START' AND dbt_invocation_id='{dbt_invocation_id}' ), EndStage AS ( SELECT dbt_invocation_id, JSON_EXTRACT_SCALAR(n, "$.id") AS name, JSON_EXTRACT_SCALAR(t, "$.started_at") AS start_time, JSON_EXTRACT_SCALAR(t, "$.completed_at") AS end_time, FROM `{full_table_id}` AS l CROSS JOIN UNNEST(JSON_EXTRACT_ARRAY(l.info, "$")) AS n CROSS JOIN UNNEST(JSON_EXTRACT_ARRAY(n, "$.timing")) AS t WHERE stage='END' AND JSON_EXTRACT_SCALAR(n, "$.status")='success' AND dbt_invocation_id='{dbt_invocation_id}' ), Combined AS ( SELECT dbt_invocation_id, TO_JSON(ARRAY_AGG( STRUCT(name, depends_on, start_time, end_time) )) AS perf FROM StartStage s LEFT JOIN EndStage e USING (dbt_invocation_id, name) GROUP BY 1 ) SELECT perf FROM Combined; """ def dbt_live_invocation_query(full_table_id, project_id, location, dbt_invocation_id): """Generate query for extracting DBT performance JSON graph from live data""" return rf""" WITH StartStage AS ( SELECT dbt_invocation_id, update_time, JSON_EXTRACT_SCALAR(n, "$.id") AS name, JSON_EXTRACT_STRING_ARRAY(n, "$.depends_on") AS depends_on FROM `{full_table_id}` AS l CROSS JOIN UNNEST(JSON_EXTRACT_ARRAY(l.info, "$.tree")) AS n WHERE stage='START' AND dbt_invocation_id='{dbt_invocation_id}' ), CreationTime AS ( SELECT MIN(update_time) AS creation_time FROM StartStage ), InfoSchema AS ( SELECT (SELECT ANY_VALUE(value) FROM j.labels WHERE key='dbt_invocation_id') AS dbt_invocation_id, DATETIME(j.creation_time) AS start_time, DATETIME(end_time) AS end_time, JSON_VALUE(PARSE_JSON(REGEXP_EXTRACT(query, r'^/\* (.*) \*/')), "$.node_id") AS name, FROM CreationTime ct CROSS JOIN `{project_id}.region-{location}.INFORMATION_SCHEMA.JOBS_BY_PROJECT` j WHERE j.creation_time >= ct.creation_time AND state='DONE' AND (SELECT ANY_VALUE(value) FROM j.labels WHERE key='dbt_invocation_id')='{dbt_invocation_id}' ), Combined AS ( SELECT dbt_invocation_id, TO_JSON(ARRAY_AGG( STRUCT(name, depends_on, start_time, end_time) )) AS perf FROM StartStage s LEFT JOIN InfoSchema e USING (dbt_invocation_id, name) GROUP BY 1 ) SELECT perf FROM Combined; """ def dbt_dump_invocation(client, project_id, dataset_id, table_id, dbt_invocation_id, is_live=False): """ Dump performance graph JSON for a dbt_invocation_id Parameters ---------- client : google.cloud.bigquery.client.Client BigQuery client for querying project_id : str Project for the dbt log dataset_id : str Dataset for the dbt log (if live, correlates with jobs in same location) table_id : str Table id for the dbt log dbt_invocation_id : str Specific dbt_invocation_id to extract is_live : bool Whether to use running jobs or DBT results for analysis """ # Fully qualified table id full_table_id = f"{project_id}.{dataset_id}.{table_id}" # Generate query if is_live: # Fetch location of dataset for jobs location = client.get_dataset(f"{project_id}.{dataset_id}").location query = dbt_live_invocation_query(full_table_id, project_id, location, dbt_invocation_id) else: query = dbt_invocation_query(full_table_id, dbt_invocation_id) # Run query and dump results results = client.query(query) for row in results: parsed = json.loads(row[0]) print(json.dumps(parsed, indent=4, sort_keys=True)) def main(): """Extract DBT JSON performance graph from historical (dbt_log) or live data""" parser = argparse.ArgumentParser(description='Extract DBT performance results') parser.add_argument('--project_id', type=str, default=os.getenv('PROJECT_ID'), help='Project for DBT log') parser.add_argument('--dataset_id', type=str, required=True, help='Dataset for DBT log') parser.add_argument('--table_id', type=str, default='dbt_log', help='Table ID for DBT Log') parser.add_argument('--num', type=int, default=10, help='Number of recent invocations to list (if none fetched)') parser.add_argument('--live', action='store_true', help='Query INFORMATION_SCHEMA instead of dbt log (region required)') parser.add_argument('dbt_invocation_id', type=str, nargs='?', help='DBT invocation id to extract') args = parser.parse_args() client = bigquery.Client() if not args.dbt_invocation_id: dbt_recent_invocations(client, args.project_id, args.dataset_id, args.table_id, args.num) else: dbt_dump_invocation(client, args.project_id, args.dataset_id, args.table_id, args.dbt_invocation_id, args.live) if __name__ == "__main__": main()