launcher/telemetry.py (78 lines of code) (raw):
import json
import os
import sys
import time
from dataclasses import asdict, dataclass, field
from typing import List
CW_NAME_SPACE = "RecipesTelemetry"
@dataclass
class Metric:
Name: str = None
Unit: str = None
@dataclass
class MetricDirective:
Namespace: str = ""
Dimensions: List[List[str]] = None
Metrics: List[Metric] = None
@dataclass
class Metadata:
CloudWatchMetrics: List[MetricDirective] = field(default_factory=lambda: [MetricDirective])
Timestamp: int = None
@dataclass
class CWTelemetryStart:
account_id: str = ""
training_start_time: int = 0
num_nodes: int = 0
job_name: str = ""
cluster_type: str = ""
instance_type: str = ""
_aws: Metadata = None
job_id: int = 0
recipe: str = ""
container: str = ""
class Telemetry:
def __init__(self, log_path="/var/log/aws/clusters/sagemaker-hyperpod-recipes-telemetry.log"):
self.log_path = log_path
def get_account_id(self):
import boto3
client = boto3.client("sts")
return client.get_caller_identity()["Account"]
def publish_cw_log(self, log):
save_log = asdict(log)
with open(self.log_path, "a") as f:
f.write(json.dumps(save_log, separators=(",", ":")) + "\n")
def start(
self,
cluster_type=None,
instance_type=None,
num_nodes=None,
job_id=None,
container=None,
):
if not os.path.exists(self.log_path):
return
account_id = self.get_account_id()
cw_telemetry_start = CWTelemetryStart(account_id=account_id)
cw_telemetry_start.training_start_time = int(time.time() * 1000)
cw_telemetry_start.num_nodes = int(num_nodes)
cw_telemetry_start.cluster_type = cluster_type
cw_telemetry_start.instance_type = instance_type
cw_telemetry_start.job_id = job_id
cw_telemetry_start.container = container
recipe = ""
for arg in sys.argv:
if arg.startswith("recipes="):
recipe = arg.split("=")[1]
cw_telemetry_start.recipe = recipe
metadata = Metadata(
Timestamp=int(time.time() * 1000),
CloudWatchMetrics=[
MetricDirective(
Namespace=CW_NAME_SPACE,
Dimensions=[[]],
Metrics=[Metric(Name="num_nodes", Unit="Count")],
)
],
)
cw_telemetry_start._aws = metadata
self.publish_cw_log(cw_telemetry_start)