in experimenter/experimenter/kinto/tasks.py [0:0]
def nimbus_check_kinto_push_queue_by_collection(collection):
"""
Because kinto has a restriction that it can only have a single pending review, this
task brokers the queue of all experiments ready to be pushed to kinto and ensures
that only a single experiment is ever in review.
A scheduled task that
- Checks the kinto collection for a single rejected experiment from a previous push
- If one exists, pull it out of the collection and mark it as rejected
- Checks if there is still a pending review and if so, aborts
- Gets the list of all experiments ready to be pushed to kinto and pushes the first
one
- Checks for experiments that should be paused but are not paused in the kinto
collection and marks them as paused and updates the record in the collection.
"""
metrics.incr(f"check_kinto_push_queue_by_collection:{collection}.started")
applications = [
application.slug
for application in NimbusExperiment.APPLICATION_CONFIGS.values()
if collection in application.kinto_collections
]
kinto_client = KintoClient(collection)
should_rollback = False
if kinto_client.has_pending_review():
logger.info(f"{collection} has pending review")
if handle_pending_review(applications, collection):
return
should_rollback = True
if kinto_client.has_rejection():
logger.info(f"{collection} has rejection")
handle_rejection(applications, kinto_client)
should_rollback = True
if should_rollback:
kinto_client.rollback_changes()
records = kinto_client.get_main_records()
handle_launching_experiments(applications, records, collection)
handle_updating_experiments(applications, records, collection)
handle_ending_experiments(applications, records, collection)
handle_waiting_experiments(applications, collection)
if queued_launch_experiment := next(
NimbusExperiment.objects.launch_queue(applications, collection), None
):
nimbus_push_experiment_to_kinto.delay(collection, queued_launch_experiment.id)
elif queued_end_experiment := next(
NimbusExperiment.objects.end_queue(applications, collection), None
):
nimbus_end_experiment_in_kinto.delay(collection, queued_end_experiment.id)
elif queued_pause_experiment := next(
NimbusExperiment.objects.update_queue(applications, collection), None
):
nimbus_update_experiment_in_kinto.delay(collection, queued_pause_experiment.id)
metrics.incr(f"check_kinto_push_queue_by_collection:{collection}.completed")