lib/metric-config-parser/metric_config_parser/monitoring.py (128 lines of code) (raw):
import copy
from typing import TYPE_CHECKING, Any, List, Mapping, Optional
import attr
if TYPE_CHECKING:
from metric_config_parser.config import ConfigCollection
from metric_config_parser.definition import DefinitionSpecSub
from metric_config_parser.alert import Alert, AlertsSpec
from metric_config_parser.data_source import DataSourcesSpec
from metric_config_parser.dimension import Dimension, DimensionsSpec
from metric_config_parser.experiment import Experiment
from metric_config_parser.metric import MetricsSpec, Summary
from metric_config_parser.parameter import ParameterSpec
from metric_config_parser.project import ProjectConfiguration, ProjectSpec
from metric_config_parser.util import converter
@attr.s(auto_attribs=True)
class MonitoringConfiguration:
"""
Represents configuration options.
All references, for example to data sources, have been resolved in this representation.
Instead of instantiating this directly, consider using MonitoringSpec.resolve().
"""
project: Optional[ProjectConfiguration] = None
metrics: List[Summary] = attr.Factory(list)
dimensions: List[Dimension] = attr.Factory(list)
alerts: List[Alert] = attr.Factory(list)
@attr.s(auto_attribs=True)
class MonitoringSpec:
"""
Represents a configuration file.
The expected use is like:
MonitoringSpec.from_dict(toml.load(my_configuration_file)).resolve()
which will produce a fully populated, concrete `MonitoringConfiguration`.
"""
metrics: MetricsSpec = attr.Factory(MetricsSpec)
data_sources: DataSourcesSpec = attr.Factory(DataSourcesSpec)
project: ProjectSpec = attr.Factory(ProjectSpec)
dimensions: DimensionsSpec = attr.Factory(DimensionsSpec)
alerts: AlertsSpec = attr.Factory(AlertsSpec)
parameters: ParameterSpec = attr.Factory(ParameterSpec)
_resolved: bool = False
@classmethod
def from_dict(cls, d: Mapping[str, Any]) -> "MonitoringSpec":
"""Create a `MonitoringSpec` from a dict."""
d = dict((k.lower(), v) for k, v in d.items())
return converter.structure(d, cls)
@classmethod
def from_definition_spec(
cls,
spec: "DefinitionSpecSub",
project: Optional["ProjectSpec"] = None,
) -> "MonitoringSpec":
from metric_config_parser.definition import DefinitionSpec
if not isinstance(spec, MonitoringSpec) and not isinstance(spec, DefinitionSpec):
raise ValueError(f"Cannot create MonitoringSpec from {spec}")
if project is None:
if isinstance(spec, MonitoringSpec):
return cls(
metrics=spec.metrics,
data_sources=spec.data_sources,
dimensions=spec.dimensions,
alerts=spec.alerts,
parameters=spec.parameters,
project=spec.project,
)
else:
return cls(
metrics=spec.metrics,
data_sources=spec.data_sources,
dimensions=spec.dimensions,
parameters=spec.parameters,
)
else:
return cls(
metrics=spec.metrics,
data_sources=spec.data_sources,
dimensions=spec.dimensions,
alerts=spec.alerts,
project=project,
parameters=spec.parameters,
)
def resolve(
self, experiment: Optional["Experiment"], configs: "ConfigCollection"
) -> MonitoringConfiguration:
"""Create a `MonitoringConfiguration` from the spec."""
if self._resolved:
raise Exception("Can't resolve an MonitoringSpec twice")
self._resolved = True
resolved_project = self.project.resolve(self, experiment, configs)
# filter to only have metrics that actually need to be monitored
metrics = []
for metric_ref in {p.name for p in self.project.metrics}:
if metric_ref in self.metrics.definitions:
metrics += self.metrics.definitions[metric_ref].resolve(
self, resolved_project, configs
)
else:
raise ValueError(f"No definition for metric {metric_ref}.")
# filter to only have dimensions that actually are in use
dimensions = []
for dimension_ref in {d.name for d in self.project.population.dimensions}:
if dimension_ref in self.dimensions.definitions:
dimensions.append(
self.dimensions.definitions[dimension_ref].resolve(
self, resolved_project, configs
)
)
else:
raise ValueError(f"No definition for dimension {dimension_ref}.")
# filter to only have alerts that actually are in use
alerts = []
for alert_ref in {d.name for d in self.project.alerts}:
if alert_ref in self.alerts.definitions:
alerts.append(
self.alerts.definitions[alert_ref].resolve(self, resolved_project, configs)
)
else:
raise ValueError(f"No definition for alert {alert_ref}.")
return MonitoringConfiguration(
project=resolved_project,
metrics=metrics,
dimensions=dimensions,
alerts=alerts,
)
def merge(self, other: Optional["DefinitionSpecSub"]):
"""Merge another monitoring spec into the current one."""
from metric_config_parser.definition import DefinitionSpec
if other:
if isinstance(other, MonitoringSpec):
self.project.merge(other.project)
if isinstance(other, MonitoringSpec) or isinstance(other, DefinitionSpec):
self.dimensions.merge(other.dimensions)
if isinstance(other, MonitoringSpec) or isinstance(other, DefinitionSpec):
self.alerts.merge(other.alerts)
self.data_sources.merge(other.data_sources)
self.metrics.merge(other.metrics)
@classmethod
def default_for_platform_or_type(
cls, platform: str, configs: "ConfigCollection"
) -> "MonitoringSpec":
"""Return the default config for the provided platform."""
default_metrics = configs.get_platform_defaults(platform)
if default_metrics is None or not hasattr(default_metrics, "project"):
spec = cls()
else:
spec = cls.from_definition_spec(default_metrics)
return copy.deepcopy(spec)