cookbooks/aws-parallelcluster-slurm/files/default/config_slurm/scripts/event_utils.py (152 lines of code) (raw):
# Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License").
# You may not use this file except in compliance with the
# License. A copy of the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "LICENSE.txt" file accompanying this file. This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
# OR CONDITIONS OF ANY KIND, express or implied. See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import sys
import traceback
from collections import ChainMap
from datetime import datetime, timezone
from typing import Callable, Dict, Iterable
logger = logging.getLogger(__name__)
logger = logging.LoggerAdapter(logger, {"job_id": ""})
class EventPublisher:
"""Class for generating structured log events for cluster."""
def __init__(
self,
event_logger: any,
cluster_name: str,
node_role: str,
component: str,
instance_id: str,
**kwargs: Dict[str, any],
):
self.event_publisher = EventPublisher._make_event_publisher(
event_logger=event_logger,
cluster_name=cluster_name,
node_role=node_role,
component=component,
instance_id=instance_id,
**kwargs,
)
@property
def publish_event(self):
"""Return event publisher to publish an event."""
return self.event_publisher
@staticmethod
def current_time() -> datetime:
"""Return current time in UTC."""
return datetime.now(timezone.utc)
@staticmethod
def timestamp() -> str:
"""Return ISO formatted UTC timestamp."""
return EventPublisher.current_time().isoformat(timespec="milliseconds")
@staticmethod
def _make_event_publisher(
event_logger: any,
cluster_name: str,
node_role: str,
component: str,
instance_id: str,
**global_args: Dict[str, any],
) -> Callable:
def event_publisher(
event_level, event_type, event_message, timestamp: str = None, event_supplier: Iterable = None, **kwargs
):
if event_logger.isEnabledFor(event_level):
try:
now = timestamp if timestamp else EventPublisher.timestamp()
if not event_supplier:
event_supplier = (kwargs,)
for event in event_supplier:
default_properties = {
"datetime": now,
"version": 0,
"scheduler": "slurm",
"cluster-name": cluster_name,
"node-role": node_role,
"component": component,
"level": logging.getLevelName(event_level),
"instance-id": instance_id,
"event-type": event_type,
"message": event_message,
"detail": {},
}
event = ChainMap(event, kwargs, default_properties, global_args)
event_logger.log(event_level, "%s", json.dumps(dict(event)))
except Exception as e:
extraction = traceback.extract_stack()
logger.error(
"Failed to publish event `%s`: %s\n%s\n%s",
event_type,
e,
"".join(traceback.format_exception(*sys.exc_info())),
"".join(traceback.format_list(extraction)),
)
return event_publisher
def publish_health_check_result(
event_publisher: EventPublisher,
job_id: str,
health_check_name: str,
health_check_result: int,
health_check_output: str,
):
def detail_supplier():
yield {
"detail": {
"job-id": job_id,
"health-check-name": health_check_name,
"health-check-result": health_check_result,
"health-check-output": f"{health_check_output}".split("\n") if health_check_output else None,
},
}
try:
event_publisher.publish_event(
datetime=event_publisher.timestamp(),
event_level=logging.WARNING if health_check_result else logging.INFO,
event_type="compute-node-health-check",
event_message=f"Result of compute node health check {health_check_name}",
event_supplier=detail_supplier(),
)
except Exception as err:
logger.error("Exception while publishing event: %s", repr(err))
def publish_health_check_exception(
event_publisher: EventPublisher,
job_id: str,
health_check_name: str,
err: Exception,
):
try:
event_publisher.publish_event(
datetime=event_publisher.timestamp(),
event_level=logging.ERROR,
event_type="compute-node-health-check-exception",
event_message=f"Failure when executing health check {health_check_name}",
detail={
"job-id": job_id,
"health-check-name": health_check_name,
"error": repr(err),
},
)
except Exception as e:
logger.error("Exception while publishing event: %s", repr(e))
def get_node_spec_from_file(node_spec_file: str) -> Dict[str, any]:
if node_spec_file:
try:
return _read_node_spec(node_spec_file)
except Exception as e:
logger.error("Failed to load node spec file: %s", e)
return {
"region": "error",
"cluster_name": "error",
"scheduler": "error",
"node_role": "ComputeFleet",
"instance_id": "error",
"compute": {
"queue-name": "error",
"compute-resource": "error",
"name": "error",
"node-type": "error",
"instance-id": "error",
"instance-type": "error",
"availability-zone": "error",
"address": "error",
"hostname": "error",
},
}
def _read_node_spec(node_spec_path: str) -> Dict[str, any]:
with open(node_spec_path, "r", encoding="utf-8") as file:
return json.load(file)