jobs/fxci-taskcluster-export/fxci_etl/metric/export.py (101 lines of code) (raw):

import base64 import json from datetime import datetime, timedelta, timezone from pprint import pprint from google.cloud import storage from google.cloud.monitoring_v3 import ( Aggregation, ListTimeSeriesRequest, MetricServiceClient, TimeInterval, ) from google.protobuf.duration_pb2 import Duration from google.protobuf.timestamp_pb2 import Timestamp from fxci_etl.config import Config from fxci_etl.loaders.bigquery import BigQueryLoader from fxci_etl.schemas import Metrics METRIC = "compute.googleapis.com/instance/uptime" DEFAULT_INTERVAL = 3600 * 6 MIN_BUFFER_TIME = 10 # minutes class MetricExporter: def __init__(self, config): self.config = config if config.storage.credentials: self.storage_client = storage.Client.from_service_account_info( json.loads(base64.b64decode(config.storage.credentials).decode("utf8")) ) else: self.storage_client = storage.Client(project=config.storage.project) if config.monitoring.credentials: self.metric_client = MetricServiceClient.from_service_account_info( json.loads(base64.b64decode(config.monitoring.credentials).decode("utf8")) ) else: self.metric_client = MetricServiceClient() def get_timeseries(self, project: str, interval: TimeInterval): metric_filter = f'metric.type="{METRIC}"' aggregation = Aggregation( alignment_period=Duration( seconds=int(interval.end_time.timestamp()) # type: ignore - int(interval.start_time.timestamp()) # type: ignore ), per_series_aligner=Aggregation.Aligner.ALIGN_SUM, cross_series_reducer=Aggregation.Reducer.REDUCE_SUM, group_by_fields=[ "metric.labels.instance_name", "resource.labels.instance_id", "resource.labels.zone", ], ) results = self.metric_client.list_time_series( request={ "name": f"projects/{project}", "filter": metric_filter, "interval": interval, "view": ListTimeSeriesRequest.TimeSeriesView.FULL, "aggregation": aggregation, } ) return results def get_time_interval(self, date: str) -> TimeInterval: """Return the time interval for the specified date.""" now = datetime.now(timezone.utc) date_obj = datetime.strptime(date, "%Y-%m-%d") start_time = datetime.combine(date_obj, datetime.min.time()).replace(tzinfo=timezone.utc) end_time = datetime.combine(date_obj, datetime.max.time()).replace(tzinfo=timezone.utc) # Ensure end_time is at least 10 minutes in the past to ensure Cloud # Monitoring has finished adding metrics for the prior day. if now <= end_time + timedelta(minutes=MIN_BUFFER_TIME): raise Exception(f"Abort: metric export ran too close to {end_time}! " f"It must run at least {MIN_BUFFER_TIME} minutes after this time.") return TimeInterval( end_time=Timestamp(seconds=int(end_time.timestamp())), start_time=Timestamp(seconds=int(start_time.timestamp())), ) def export_metrics(config: Config, date: str, dry_run: bool = False) -> int: exporter = MetricExporter(config) interval = exporter.get_time_interval(date) records = [] for project in config.monitoring.projects: for ts in exporter.get_timeseries(project, interval): if dry_run: pprint(ts) continue records.append( Metrics.from_dict( { "project": ts.resource.labels["project_id"], "zone": ts.resource.labels["zone"], "instance_id": ts.resource.labels["instance_id"], "uptime": round(ts.points[0].value.double_value, 2), "interval_start_time": ts.points[ 0 ].interval.start_time.timestamp(), # type: ignore "interval_end_time": ts.points[0].interval.end_time.timestamp(), # type: ignore } ) ) if dry_run: return 0 if not records: raise Exception("Abort: No records retrieved!") loader = BigQueryLoader(config, "metrics") loader.replace(date, records) return 0