gcpdiag/lint/bigquery/warn_2023_002_wildcard_tables_query.py (46 lines of code) (raw):

# # Copyright 2022 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Lint as: python3 """No errors querying wildcard tables A query has been used on a wildcard table and the field was not found """ from boltons.iterutils import get_path from gcpdiag import lint, models from gcpdiag.queries import apis, crm, logs #String the is unique to this error MATCH_STR = 'Unrecognized name:' #Where to look for the error ERROR_IN_LOGGING = [ 'severity=ERROR', 'protoPayload.@type="type.googleapis.com/google.cloud.audit.AuditLog"', r'protoPayload.serviceData.jobCompletedEvent.job.jobStatistics.referencedTables.tableId=~"\*$"', f'protoPayload.serviceData.jobCompletedEvent.job.jobStatus.error.message:"{MATCH_STR}"' ] logs_by_project = {} #Get the logs for the error def prepare_rule(context: models.Context): logs_by_project[context.project_id] = logs.query( project_id=context.project_id, resource_type='bigquery_resource', log_name='log_id("cloudaudit.googleapis.com%2Fdata_access")', filter_str=' AND '.join(ERROR_IN_LOGGING)) def run_rule(context: models.Context, report: lint.LintReportRuleInterface): project = crm.get_project(context.project_id) # skip entire rule is logging disabled if not apis.is_enabled(context.project_id, 'logging'): report.add_skipped(project, 'Logging api is disabled') return # skip the rule if bigquery is not enabled if not apis.is_enabled(context.project_id, 'bigquery'): report.add_skipped(project, 'bigquery api is disabled') return #list of all found errors error_entries = [] #loop through the errors if logs_by_project.get(context.project_id) and \ logs_by_project[context.project_id].entries: for log_entry in logs_by_project[context.project_id].entries: #make sure we found the correct errors if MATCH_STR not in get_path(log_entry, ('protoPayload', 'status', 'message'), default=''): continue else: #add the error to the error list error_entries.append(log_entry['protoPayload']['status']['message']) #if there are no errors, report success if len(error_entries) == 0: report.add_ok(project) #if there are more than 10 errors, report failure elif len(error_entries) >= 1: report.add_failed( project, ('There have been errors in your project' ' where a field was not found in a query using a wildcard table'), ' to prevent this, please rewrite your query or change the tables schema' ) else: report.add_ok(project)