bigquery_jobs_view.sql (73 lines of code) (raw):

-- Copyright 2023 Google LLC -- -- Licensed under the Apache License, Version 2.0 (the "License"); -- you may not use this file except in compliance with the License. -- You may obtain a copy of the License at -- -- http://www.apache.org/licenses/LICENSE-2.0 -- -- Unless required by applicable law or agreed to in writing, software -- distributed under the License is distributed on an "AS IS" BASIS, -- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -- See the License for the specific language governing permissions and -- limitations under the License. WITH -- Pull out the InsertJob events after the job is DONE. This will contain -- the information about each job that is finished. Most of the information -- is contained in metadataJson. RawJobsData AS ( SELECT PARSE_JSON(protopayload_auditlog.metadataJson) AS metadataJson, resource.labels.location, resource.labels.project_id, protopayload_auditlog.resourceName, protopayload_auditlog.authenticationInfo.principalEmail, FROM `${monitoring_dataset}.${audit_table}` d WHERE resource.type='bigquery_project' AND protopayload_auditlog.methodName='google.cloud.bigquery.v2.JobService.InsertJob' AND JSON_VALUE(protopayload_auditlog.metadataJson, "$.jobChange.after")='DONE' ), -- Extract from the metadataJson and resourceName important BigQuery -- attributes. ExtractedJobsData AS ( SELECT STRUCT( project_id, location, principalEmail, REGEXP_EXTRACT(resourceName, "projects/[^/]*/jobs/(.*)") AS job_id, JSON_VALUE(metadataJson, "$.jobChange.job.jobName") AS job_name, JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.type") AS job_type, JSON_VALUE(metadataJson, "$.jobChange.job.jobStatus.errorResult.code") AS error_code, JSON_VALUE(metadataJson, "$.jobChange.job.jobStatus.errorResult.message") AS error_message, JSON_VALUE(metadataJson, "$.jobChange.reason") AS reason ) AS job, STRUCT( JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.writeDisposition") AS writeDisposition, JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.statementType") AS statementType, JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.createDisposition") AS create_disposition, JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.destinationTable") AS destination_table, STRUCT( REGEXP_EXTRACT(JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.destinationTable"), r'projects/([^/]*)/') AS project, REGEXP_EXTRACT(JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.destinationTable"), r'projects/[^/]*/datasets/([^/]*)/') AS dataset, REGEXP_EXTRACT(JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.destinationTable"), r'projects/[^/]*/datasets/[^/]*/tables/(.*)') AS table ) AS destination_table_parts, JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.priority") AS priority, JSON_VALUE(metadataJson, "$.jobChange.job.jobConfig.queryConfig.query") AS query ) AS query, STRUCT( SAFE_CAST(JSON_VALUE(metadataJson, "$.jobChange.job.jobStats.createTime") AS TIMESTAMP) AS create_time, SAFE_CAST(JSON_VALUE(metadataJson, "$.jobChange.job.jobStats.endTime") AS TIMESTAMP) AS end_time, JSON_VALUE(metadataJson, "$.jobChange.job.jobStats.queryStats.billingTier") AS billing_tier, IFNULL( SAFE_CAST(JSON_VALUE(metadataJson, "$.jobChange.job.jobStats.queryStats.outputRowCount") AS INT64), 0) AS output_rows, JSON_VALUE_ARRAY(metadataJson, "$.jobChange.job.jobStats.queryStats.referencedTables") AS referenced_tables, IFNULL( SAFE_CAST(JSON_VALUE(metadataJson, "$.jobChange.job.jobStats.queryStats.totalBilledBytes") AS INT64), 0) AS billed_bytes, IFNULL( SAFE_CAST(JSON_VALUE(metadataJson, "$.jobChange.job.jobStats.queryStats.totalProcessedBytes") AS INT64), 0) AS processed_bytes, SAFE_CAST(JSON_VALUE(metadataJson, "$.jobChange.job.jobStats.startTime") AS TIMESTAMP) AS start_time, IFNULL( SAFE_CAST(JSON_VALUE(metadataJson, "$.jobChange.job.jobStats.totalSlotMs") AS INT64), 0) AS slot_ms ) AS job_stats, JSON_QUERY(metadataJson, "$.jobChange.job.jobConfig.labels") AS labels FROM RawJobsData ), -- Extract DBT JSON structures from the query header and from -- DBT on run hook WithDBTPayload AS ( SELECT *, REGEXP_EXTRACT( query.query, r'/* ({.*}) \*\/') AS dbt_hdr, REGEXP_EXTRACT( query.query, r'/* DBT ({.*}) \*\/') AS dbt_payload FROM ExtractedJobsData ), -- Extract from the dbt_payload and dbt_hdr JSON structures -- DBT-specific information (where available) WithDBTData AS ( SELECT * EXCEPT (dbt_hdr), STRUCT( -- BigQuery deeplink CONCAT('https://console.cloud.google.com/bigquery', '?project=', job.project_id, '&j=bq:', job.location, ':', job.job_id, '&page=queryresults') AS bigquery_job, -- Destination table deeplink CONCAT('https://console.cloud.google.com/bigquery', '?project=', query.destination_table_parts.project, '&ws=!1m5!1m4!4m3!1s', query.destination_table_parts.project, '!2s', query.destination_table_parts.dataset, '!3s', query.destination_table_parts.table) AS destination_table ) AS links, STRUCT( JSON_VALUE(labels, "$.dbt_invocation_id") AS dbt_invocation_id, JSON_VALUE(dbt_hdr, "$.target_name") AS target_name, JSON_VALUE(dbt_hdr, "$.profile_name") AS profile_name, JSON_VALUE(dbt_hdr, "$.node_id") AS node_id, JSON_VALUE(dbt_hdr, "$.app") AS app, JSON_VALUE(dbt_hdr, "$.dbt_version") AS dbt_version ) AS dbt FROM WithDBTPayload ) SELECT * FROM WithDBTData