in src/python/phyre/eval_task_complexity.py [0:0]
def step(self):
"""Schedule a chunk of evaluation jobs."""
done_simulations_per_task_tier = {}
for key, stats in self._state['stats_per_task_tier'].items():
if key in self._state['done_task_tier']:
continue
counts = sum(stats['status_counts'].values())
done_simulations_per_task_tier[key] = counts
num_unresolved_task_tier_pairs = len(done_simulations_per_task_tier)
if self.reject_ball_solvable:
# First compute stats for ball tier.
ball_only = {
k: v
for k, v in done_simulations_per_task_tier.items()
if k[1] == 'ball'
}
if ball_only:
done_simulations_per_task_tier = ball_only
simluation_tasks = []
for key in itertools.cycle(list(done_simulations_per_task_tier)):
start = done_simulations_per_task_tier[key]
done_simulations_per_task_tier[key] += self.simulate_worker_size
task_id, tier = key
simluation_tasks.append((self._task_id_to_tasks[task_id], tier,
start, self.simulate_worker_size))
if len(simluation_tasks) >= self.warp_size:
break
logging.info(
'Starting simulation chunk with %d items. Total unresolved tasks:'
' %s. Simulations_done: %d', len(simluation_tasks),
num_unresolved_task_tier_pairs,
sum(
sum(x['status_counts'].values())
for x in self._state['stats_per_task_tier'].values()))
for result in self._pool.imap(_worker, simluation_tasks):
key = (result['task_id'], result['tier'])
if key in self._state['done_task_tier']:
# We scheduled a simulation task, but already got enough data.
# So just ignoring this bit to be agnostic of warp_size.
continue
# Note, we may "overshoot" here: update stats that are already complete.
stats = self._state['stats_per_task_tier'][key]
for status, count in result['statuses'].items():
stats['status_counts'][status] += count
stats['solutions'].extend(result['stable_solutions'])
del stats['solutions'][MAX_SOLUTIONS_TO_KEEP:]
stats['unstable_solutions'].extend(result['unstable_solutions'])
del stats['unstable_solutions'][MAX_SOLUTIONS_TO_KEEP:]
self._update_done_stats(*key)
return self.done()