in src/braket/aws/aws_quantum_task_batch.py [0:0]
def retry_unsuccessful_tasks(self):
"""Retries any tasks in the batch without valid results.
This method should only be called after `results()` has been called at least once.
The method will generate new tasks for any failed tasks, so `self.task` and
`self.results()` may return different values after a call to this method.
Returns:
bool: Whether or not all retried tasks completed successfully.
"""
if not self._results:
raise RuntimeError("results() should be called before attempting to retry")
unsuccessful_indices = [index for index, result in enumerate(self._results) if not result]
# Return early if there's nothing to retry
if not unsuccessful_indices:
return True
retried_tasks = AwsQuantumTaskBatch._execute(
self._aws_session,
self._device_arn,
[self._task_specifications[i] for i in unsuccessful_indices],
self._s3_destination_folder,
self._shots,
self._max_parallel,
self._max_workers,
self._poll_timeout_seconds,
self._poll_interval_seconds,
*self._aws_quantum_task_args,
**self._aws_quantum_task_kwargs,
)
for index, task in zip(unsuccessful_indices, retried_tasks):
self._tasks[index] = task
retried_results = AwsQuantumTaskBatch._retrieve_results(retried_tasks, self._max_workers)
for index, result in zip(unsuccessful_indices, retried_results):
self._results[index] = result
self._unsuccessful = {
task.id for task, result in zip(retried_tasks, retried_results) if not result
}
return not self._unsuccessful