treeherder/perf/auto_perf_sheriffing/sherlock.py (166 lines of code) (raw):
import logging
from datetime import datetime, timedelta
from json import JSONDecodeError
from logging import INFO, WARNING
from django.conf import settings
from django.db.models import QuerySet
from taskcluster.helper import TaskclusterConfig
from treeherder.perf.auto_perf_sheriffing.backfill_reports import (
BackfillReportMaintainer,
)
from treeherder.perf.auto_perf_sheriffing.backfill_tool import BackfillTool
from treeherder.perf.auto_perf_sheriffing.secretary import Secretary
from treeherder.perf.exceptions import CannotBackfillError, MaxRuntimeExceededError
from treeherder.perf.models import (
BackfillNotificationRecord,
BackfillRecord,
BackfillReport,
)
logger = logging.getLogger(__name__)
CLIENT_ID = settings.PERF_SHERIFF_BOT_CLIENT_ID
ACCESS_TOKEN = settings.PERF_SHERIFF_BOT_ACCESS_TOKEN
class Sherlock:
"""
Robot variant of a performance sheriff (the main class)
Automates backfilling of skipped perf jobs.
"""
DEFAULT_MAX_RUNTIME = timedelta(minutes=50)
def __init__(
self,
report_maintainer: BackfillReportMaintainer,
backfill_tool: BackfillTool,
secretary: Secretary,
max_runtime: timedelta = None,
supported_platforms: list[str] = None,
):
self.report_maintainer = report_maintainer
self.backfill_tool = backfill_tool
self.secretary = secretary
self._max_runtime = self.DEFAULT_MAX_RUNTIME if max_runtime is None else max_runtime
self.supported_platforms = supported_platforms or settings.SUPPORTED_PLATFORMS
self._wake_up_time = datetime.now()
def sheriff(self, since: datetime, frameworks: list[str], repositories: list[str]):
logger.info("Sherlock: Validating settings...")
self.secretary.validate_settings()
logger.info("Sherlock: Marking reports for backfill...")
self.secretary.mark_reports_for_backfill()
self.assert_can_run()
# secretary checks the status of all backfilled jobs
self.secretary.check_outcome()
self.assert_can_run()
# reporter tool should always run *(only handles preliminary records/reports)*
logger.info("Sherlock: Reporter tool is creating/maintaining reports...")
self._report(since, frameworks, repositories)
self.assert_can_run()
# backfill tool follows
logger.info("Sherlock: Starting to backfill...")
self._backfill(frameworks, repositories)
self.assert_can_run()
def runtime_exceeded(self) -> bool:
elapsed_runtime = datetime.now() - self._wake_up_time
return self._max_runtime <= elapsed_runtime
def assert_can_run(self):
if self.runtime_exceeded():
raise MaxRuntimeExceededError("Sherlock: Max runtime exceeded.")
def _report(
self, since: datetime, frameworks: list[str], repositories: list[str]
) -> list[BackfillReport]:
return self.report_maintainer.provide_updated_reports(since, frameworks, repositories)
def _backfill(self, frameworks: list[str], repositories: list[str]):
for platform in self.supported_platforms:
self.__backfill_on(platform, frameworks, repositories)
def __backfill_on(self, platform: str, frameworks: list[str], repositories: list[str]):
left = self.secretary.backfills_left(on_platform=platform)
total_consumed = 0
records_to_backfill = self.__fetch_records_requiring_backfills_on(
platform, frameworks, repositories
)
logger.info(
f"Sherlock: {records_to_backfill.count()} records found to backfill on {platform.title()}."
)
for record in records_to_backfill:
if left <= 0 or self.runtime_exceeded():
break
left, consumed = self._backfill_record(record, left)
logger.info(f"Sherlock: Backfilled record with id {record.alert.id}.")
# Model used for reporting backfill outcome
BackfillNotificationRecord.objects.create(record=record)
total_consumed += consumed
self.secretary.consume_backfills(platform, total_consumed)
logger.info(f"Sherlock: Consumed {total_consumed} backfills for {platform.title()}.")
logger.debug(f"Sherlock: Having {left} backfills left on {platform.title()}.")
@staticmethod
def __fetch_records_requiring_backfills_on(
platform: str, frameworks: list[str], repositories: list[str]
) -> QuerySet:
records_to_backfill = BackfillRecord.objects.select_related(
"alert",
"alert__series_signature",
"alert__series_signature__platform",
"alert__summary__framework",
"alert__summary__repository",
).filter(
status=BackfillRecord.READY_FOR_PROCESSING,
alert__series_signature__platform__platform__icontains=platform,
alert__summary__framework__name__in=frameworks,
alert__summary__repository__name__in=repositories,
)
return records_to_backfill
def _backfill_record(self, record: BackfillRecord, left: int) -> tuple[int, int]:
consumed = 0
try:
context = record.get_context()
except JSONDecodeError:
logger.warning(f"Failed to backfill record {record.alert.id}: invalid JSON context.")
record.status = BackfillRecord.FAILED
record.save()
else:
data_points_to_backfill = self.__get_data_points_to_backfill(context)
for data_point in data_points_to_backfill:
if left <= 0 or self.runtime_exceeded():
break
try:
using_job_id = data_point["job_id"]
self.backfill_tool.backfill_job(using_job_id)
left, consumed = left - 1, consumed + 1
except (KeyError, CannotBackfillError, Exception) as ex:
logger.debug(f"Failed to backfill record {record.alert.id}: {ex}")
else:
record.try_remembering_job_properties(using_job_id)
success, outcome = self._note_backfill_outcome(
record, len(data_points_to_backfill), consumed
)
log_level = INFO if success else WARNING
logger.log(log_level, f"{outcome} (for backfill record {record.alert.id})")
return left, consumed
@staticmethod
def _note_backfill_outcome(
record: BackfillRecord, to_backfill: int, actually_backfilled: int
) -> tuple[bool, str]:
success = False
record.total_actions_triggered = actually_backfilled
if actually_backfilled == to_backfill:
record.status = BackfillRecord.BACKFILLED
success = True
outcome = "Backfilled all data points"
else:
record.status = BackfillRecord.FAILED
if actually_backfilled == 0:
outcome = "Backfill attempts on all data points failed right upon request."
elif actually_backfilled < to_backfill:
outcome = "Backfill attempts on some data points failed right upon request."
else:
raise ValueError(
f"Cannot have backfilled more than available attempts ({actually_backfilled} out of {to_backfill})."
)
record.set_log_details({"action": "BACKFILL", "outcome": outcome})
record.save()
return success, outcome
@staticmethod
def _is_queue_overloaded(provisioner_id: str, worker_type: str, acceptable_limit=100) -> bool:
"""
Helper method for Sherlock to check load on processing queue.
Usage example: _queue_is_too_loaded('gecko-3', 'b-linux')
:return: True/False
"""
tc = TaskclusterConfig("https://firefox-ci-tc.services.mozilla.com")
tc.auth(client_id=CLIENT_ID, access_token=ACCESS_TOKEN)
queue = tc.get_service("queue")
pending_tasks_count = queue.pendingTasks(provisioner_id, worker_type).get("pendingTasks")
return pending_tasks_count > acceptable_limit
@staticmethod
def __get_data_points_to_backfill(context: list[dict]) -> list[dict]:
context_len = len(context)
start = None
if context_len == 1:
start = 0
elif context_len > 1:
start = 1
return context[start:]