gcpdiag/lint/dataproc/warn_2024_002_hdfs_write_issue.py (50 lines of code) (raw):

# # Copyright 2021 Google LLC # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # Lint as: python3 """HDFS can write file(s) to DataNode(s). HDFS had issues writing file(s) to DataNode(s). """ import re from boltons.iterutils import get_path from gcpdiag import lint, models from gcpdiag.queries import apis, dataproc, logs RE_PATTERN = ( '(.*could only be written to .*are excluded in this operation.*)|' '(.*could only be replicated to .*are excluded in this operation.*)') LOG_NAME = 'hadoop-hdfs-namenode' SEVERITY = 'DEFAULT' LOG_FILTER = [f'severity="{SEVERITY}"', f'jsonPayload.message=~"{RE_PATTERN}"'] MSG_RE = re.compile(RE_PATTERN) logs_by_project = {} clusters_by_project = {} def prepare_rule(context: models.Context): logs_by_project[context.project_id] = logs.query( project_id=context.project_id, resource_type='cloud_dataproc_cluster', log_name=f'log_id("{LOG_NAME}")', filter_str=' AND '.join(LOG_FILTER), ) def prefetch_rule(context: models.Context): clusters_by_project[context.project_id] = dataproc.get_clusters(context) def is_relevant(entry, context): return all([ get_path(entry, ('resource', 'labels', 'project_id')) == context.project_id, get_path(entry, ('resource', 'type')) == 'cloud_dataproc_cluster', get_path(entry, 'logName') == f'projects/{context.project_id}/logs/{LOG_NAME}', MSG_RE.match(get_path(entry, ('jsonPayload', 'message'))), ]) def get_clusters_having_relevant_log_entries(context): return { get_path(e, ('resource', 'labels', 'cluster_name'), default=None) for e in logs_by_project[context.project_id].entries if is_relevant(e, context) } def run_rule(context: models.Context, report: lint.LintReportRuleInterface): """RUN the rule.""" if not apis.is_enabled(context.project_id, 'logging'): report.add_skipped(None, 'logging api is disabled') return clusters_with_issues = get_clusters_having_relevant_log_entries(context) for cluster in clusters_by_project[context.project_id]: if cluster.name in clusters_with_issues: report.add_failed( cluster, 'HDFS could not write file(s) to DataNode(s).', ) else: report.add_ok(cluster)