in client/securedrop_client/queue.py [0:0]
def process(self) -> None:
"""
Process the next job in the queue.
If the job is a ClearQueueJob, call _clear() and return from the processing loop so that
the processing thread can quit.
If the job is a PauseQueueJob, emit the paused signal and return from the processing loop so
that no more jobs are processed until the queue resumes.
If the job raises RequestTimeoutError or ServerConnectionError, then:
(1) Add a PauseQueuejob to the queue
(2) Add the job back to the queue so that it can be reprocessed once the queue is resumed.
If the job raises ApiInaccessibleError, then:
(1) Set the token to None so that the queue manager will stop enqueuing jobs since we are
no longer able to make api requests.
(2) Return from the processing loop since a valid token will be needed in order to process
jobs.
Note: Generic exceptions are handled in _do_call_api.
"""
while True:
with self.condition_add_or_remove_job:
self.condition_add_or_remove_job.wait_for(lambda: not self.queue.empty())
priority, self.current_job = self.queue.get(block=False)
if isinstance(self.current_job, ClearQueueJob):
with self.condition_add_or_remove_job:
self.current_job = None
self._clear()
return
if isinstance(self.current_job, PauseQueueJob):
self.paused.emit()
with self.condition_add_or_remove_job:
self.current_job = None
return
try:
if isinstance(self.current_job, ApiJob):
session = self.session_maker()
self.current_job._do_call_api(self.api_client, session)
except ApiInaccessibleError as e:
logger.debug(f"{type(e).__name__}: {e}")
self.api_client = None
with self.condition_add_or_remove_job:
self.current_job = None
return
except (RequestTimeoutError, ServerConnectionError) as e:
logger.debug(f"{type(e).__name__}: {e}")
self.add_job(PauseQueueJob())
with self.condition_add_or_remove_job:
job, self.current_job = self.current_job, None
self._re_add_job(job)
except Exception as e:
logger.error("Skipping job")
logger.debug(f"Skipping job: {type(e).__name__}: {e}")
finally:
with self.condition_add_or_remove_job:
self.current_job = None
session.close()