jobs/fxci-taskcluster-export/fxci_etl/console.py (58 lines of code) (raw):

from datetime import datetime, timedelta, timezone import sys from pathlib import Path from cleo.application import Application from cleo.commands.command import Command from cleo.helpers import option from fxci_etl.config import Config from fxci_etl.metric.export import export_metrics from fxci_etl.pulse.consume import drain from fxci_etl.pulse.handler import BigQueryHandler APP_NAME = "fxci-etl" class ConfigCommand(Command): options = [ option("--config", description="Path to config file to use.", flag=False, default=None) ] def parse_config(self, config_path: str | Path | None) -> Config: if config_path: return Config.from_file(config_path) return Config.from_env() class PulseDrainCommand(ConfigCommand): name = "pulse drain" description = "Process events in the pulse queues and exit." def handle(self): config = self.parse_config(self.option("config")) callbacks = [BigQueryHandler(config)] for queue in config.pulse.queues: drain(config, queue, callbacks) return 0 class MetricExportCommand(ConfigCommand): name = "metric export" description = "Export configured metrics' timeseries." options = ConfigCommand.options + [ option( "--date", flag=False, description="Calendar day to retrieve metrics from. Of the form " "'YYYY-MM-DD' (default: yesterday)" ), option( "--dry-run", flag=True, description="Print records rather than inserting them into BigQuery", ) ] def handle(self): config = self.parse_config(self.option("config")) date = self.option("date") if date is None: yesterday = datetime.now(timezone.utc).date() - timedelta(days=1) date = yesterday.strftime("%Y-%m-%d") return export_metrics(config, date, dry_run=self.option("dry-run")) def run(): application = Application() application.add(PulseDrainCommand()) application.add(MetricExportCommand()) application.run() if __name__ == "__main__": sys.exit(run())