in agents/psim.py [0:0]
def __init__(self,
task_ids,
tier,
num_workers,
max_len,
max_batch_size,
requires_imgs=True,
requires_featurized=True):
self.pipes = []
self.workers = []
sim_status_buffer = sharedctypes.RawArray(
ctypes.c_int32, max_batch_size)
self.sim_status_array = np.frombuffer(sim_status_buffer,
dtype=np.int32,
count=max_batch_size)
if requires_featurized:
img_buffer = sharedctypes.RawArray(
ctypes.c_long, (max_batch_size * max_len * phyre.SCENE_WIDTH *
phyre.SCENE_HEIGHT))
self.img_array = np.frombuffer(img_buffer, dtype=np.long,
count=max_batch_size * max_len * phyre.SCENE_WIDTH * phyre.SCENE_HEIGHT)\
.reshape(max_batch_size, max_len, phyre.SCENE_WIDTH, phyre.SCENE_HEIGHT)
else:
img_buffer = None
self.img_array = None
if requires_featurized:
feature_buffer = sharedctypes.RawArray(
ctypes.c_float, max_batch_size * max_len *
MAX_N_OBJECT_IN_SCENE * FEATURIZED_OBJECT_DIM)
self.feature_array = np.frombuffer(feature_buffer,
dtype=np.float32,
count=max_batch_size * max_len *
MAX_N_OBJECT_IN_SCENE *
FEATURIZED_OBJECT_DIM)\
.reshape(max_batch_size, max_len, MAX_N_OBJECT_IN_SCENE, FEATURIZED_OBJECT_DIM)
else:
feature_buffer = None
self.feature_array = None
if requires_featurized or requires_imgs:
mask_buffer = sharedctypes.RawArray(
ctypes.c_uint8, max_batch_size * max_len)
self.mask_array = np.frombuffer(mask_buffer, dtype=np.uint8, count=max_batch_size * max_len)\
.reshape(max_batch_size, max_len)
else:
mask_buffer = None
self.mask_array = None
action_dim = 3 if tier == "ball" else 6
action_buffer = sharedctypes.RawArray(
ctypes.c_float, max_batch_size * action_dim)
task_index_buffer = sharedctypes.RawArray(
ctypes.c_long, max_batch_size)
self.action_array = np.frombuffer(
action_buffer, dtype=np.float32,
count=max_batch_size * action_dim).reshape(max_batch_size,
action_dim)
self.task_ind_array = np.frombuffer(task_index_buffer,
dtype=np.long,
count=max_batch_size)
self.max_batch_size = max_batch_size
self.num_workers = num_workers
for i in range(num_workers):
par_con, child_con = multiprocessing.Pipe()
self.pipes.append(par_con)
worker = SimulationWorker(task_ids, tier, child_con, max_len,
sim_status_buffer, feature_buffer,
img_buffer, mask_buffer,
task_index_buffer, action_buffer,
max_batch_size, action_dim)
worker.start()
self.workers.append(worker)
# The following code segment is to make the ParallelPhyreSimulator constuctor wait for each worker to finish initializing its phyre.Simulator before returning
self.action_array[:] = 0.0
self.task_ind_array[:] = 0
for pipe in self.pipes:
pipe.send((0, 0, 0, 0, 0))
for pipe in self.pipes:
pipe.recv()