in occant_baselines/rl/planner.py [0:0]
def worker(remote, parent_remote, worker_id, use_weighted_graph, scale, niters):
parent_remote.close()
try:
while True:
cmd, data = remote.recv()
if cmd == "plan":
map_, start, goal, mask, allow_diagonal = data
if mask == 1 and not use_weighted_graph:
path_x, path_y = pyastar.astar_planner(
map_, start, goal, allow_diagonal
)
elif mask == 1 and use_weighted_graph:
path_x, path_y = pyastar.weighted_astar_planner(
map_, start, goal, allow_diagonal, scale, niters,
)
else:
path_x, path_y = None, None
remote.send((path_x, path_y))
elif cmd == "close":
remote.close()
break
except KeyboardInterrupt:
print("AStarPlannerVector worker: got KeyboardInterrupt")