in common_components/monitoring/dbt_perf.py [0:0]
def dbt_dump_invocation(client,
project_id,
dataset_id,
table_id,
dbt_invocation_id,
is_live=False):
"""
Dump performance graph JSON for a dbt_invocation_id
Parameters
----------
client : google.cloud.bigquery.client.Client
BigQuery client for querying
project_id : str
Project for the dbt log
dataset_id : str
Dataset for the dbt log (if live, correlates with jobs in same location)
table_id : str
Table id for the dbt log
dbt_invocation_id : str
Specific dbt_invocation_id to extract
is_live : bool
Whether to use running jobs or DBT results for analysis
"""
# Fully qualified table id
full_table_id = f"{project_id}.{dataset_id}.{table_id}"
# Generate query
if is_live:
# Fetch location of dataset for jobs
location = client.get_dataset(f"{project_id}.{dataset_id}").location
query = dbt_live_invocation_query(full_table_id, project_id,
location, dbt_invocation_id)
else:
query = dbt_invocation_query(full_table_id, dbt_invocation_id)
# Run query and dump results
results = client.query(query)
for row in results:
parsed = json.loads(row[0])
print(json.dumps(parsed, indent=4, sort_keys=True))