in grolp/gen/game_states/game_state_base.py [0:0]
def step(self, action_or_ind, process_frame=True):
if self.event is None:
self.event = self.env.last_event
self.current_frame_count += 1
self.total_frame_count += 1
action, should_fail = self.get_action(action_or_ind)
if should_fail:
self.env.last_event.metadata['lastActionSuccess'] = False
else:
t_start = time.time()
start_pose = game_util.get_pose(self.event)
if 'action' not in action or action['action'] is None or action['action'] == 'None':
self.env.last_event.metadata['lastActionSuccess'] = True
else:
if constants.RECORD_VIDEO_IMAGES:
im_ind = len(glob.glob(constants.save_path + '/*.png'))
if 'Teleport' in action['action']:
position = self.env.last_event.metadata['agent']['position']
rotation = self.env.last_event.metadata['agent']['rotation']
start_horizon = self.env.last_event.metadata['agent']['cameraHorizon']
start_rotation = rotation['y']
if (np.abs(action['x'] - position['x']) > 0.001 or
np.abs(action['z'] - position['z']) > 0.001):
# Movement
for xx in np.arange(.1, 1, .1):
new_action = copy.deepcopy(action)
new_action['x'] = np.round(position['x'] * (1 - xx) + action['x'] * xx, 5)
new_action['z'] = np.round(position['z'] * (1 - xx) + action['z'] * xx, 5)
new_action['rotation'] = start_rotation
new_action['horizon'] = start_horizon
self.event = self.env.step(new_action)
cv2.imwrite(constants.save_path + '/%09d.png' % im_ind,
self.event.frame[:, :, ::-1])
game_util.store_image_name(
'%09d.png' % im_ind) # eww... seriously need to clean this up
im_ind += 1
if np.abs(action['horizon'] - self.env.last_event.metadata['agent']['cameraHorizon']) > 0.001:
end_horizon = action['horizon']
for xx in np.arange(.1, 1, .1):
new_action = copy.deepcopy(action)
new_action['horizon'] = np.round(start_horizon * (1 - xx) + end_horizon * xx, 3)
new_action['rotation'] = start_rotation
self.event = self.env.step(new_action)
cv2.imwrite(constants.save_path + '/%09d.png' % im_ind,
self.event.frame[:, :, ::-1])
game_util.store_image_name('%09d.png' % im_ind)
im_ind += 1
if np.abs(action['rotation'] - rotation['y']) > 0.001:
end_rotation = action['rotation']
for xx in np.arange(.1, 1, .1):
new_action = copy.deepcopy(action)
new_action['rotation'] = np.round(start_rotation * (1 - xx) + end_rotation * xx, 3)
self.event = self.env.step(new_action)
cv2.imwrite(constants.save_path + '/%09d.png' % im_ind,
self.event.frame[:, :, ::-1])
game_util.store_image_name('%09d.png' % im_ind)
im_ind += 1
self.event = self.env.step(action)
elif 'MoveAhead' in action['action']:
self.store_ll_action(action)
self.save_image(1)
events = self.env.smooth_move_ahead(action)
for event in events:
im_ind = len(glob.glob(constants.save_path + '/*.png'))
cv2.imwrite(constants.save_path + '/%09d.png' % im_ind, event.frame[:, :, ::-1])
game_util.store_image_name('%09d.png' % im_ind)
elif 'Rotate' in action['action']:
self.store_ll_action(action)
self.save_image(1)
events = self.env.smooth_rotate(action)
for event in events:
im_ind = len(glob.glob(constants.save_path + '/*.png'))
cv2.imwrite(constants.save_path + '/%09d.png' % im_ind, event.frame[:, :, ::-1])
game_util.store_image_name('%09d.png' % im_ind)
elif 'Look' in action['action']:
self.store_ll_action(action)
self.save_image(1)
events = self.env.smooth_look(action)
for event in events:
im_ind = len(glob.glob(constants.save_path + '/*.png'))
cv2.imwrite(constants.save_path + '/%09d.png' % im_ind, event.frame[:, :, ::-1])
game_util.store_image_name('%09d.png' % im_ind)
elif 'OpenObject' in action['action']:
open_action = dict(action=action['action'],
objectId=action['objectId'],
moveMagnitude=1.0)
self.store_ll_action(open_action)
self.save_act_image(open_action, dir=constants.BEFORE)
self.event = self.env.step(open_action)
self.save_act_image(open_action, dir=constants.AFTER)
self.check_action_success(self.event)
elif 'CloseObject' in action['action']:
close_action = dict(action=action['action'],
objectId=action['objectId'])
self.store_ll_action(close_action)
self.save_act_image(close_action, dir=constants.BEFORE)
self.event = self.env.step(close_action)
self.save_act_image(close_action, dir=constants.AFTER)
self.check_action_success(self.event)
elif 'PickupObject' in action['action']:
# [hack] correct object ids of slices
action['objectId'] = self.correct_slice_id(action['objectId'])
# open the receptacle if needed
parent_recep = self.get_parent_receps(action['objectId'])
if parent_recep is not None and parent_recep['objectType'] in constants.OPENABLE_CLASS_SET:
self.open_recep(parent_recep) # stores LL action
# close/open the object if needed
pickup_obj = game_util.get_object(action['objectId'], self.env.last_event.metadata)
if pickup_obj['objectType'] in constants.FORCED_OPEN_STATE_ON_PICKUP:
if pickup_obj['isOpen'] != constants.FORCED_OPEN_STATE_ON_PICKUP[pickup_obj['objectType']]:
if pickup_obj['isOpen']:
self.close_recep(pickup_obj) # stores LL action
else:
self.open_recep(pickup_obj) # stores LL action
# pick up the object
self.check_obj_visibility(action, min_pixels=10)
pickup_action = dict(action=action['action'],
objectId=action['objectId'])
self.store_ll_action(pickup_action)
self.save_act_image(pickup_action, dir=constants.BEFORE)
self.event = self.env.step(pickup_action)
self.save_act_image(pickup_action, dir=constants.AFTER)
self.check_action_success(self.event)
# close the receptacle if needed
if parent_recep is not None:
parent_recep = game_util.get_object(parent_recep['objectId'], self.env.last_event.metadata)
self.close_recep(parent_recep) # stores LL action
elif 'PutObject' in action['action']:
if len(self.env.last_event.metadata['inventoryObjects']) > 0:
inv_obj = self.env.last_event.metadata['inventoryObjects'][0]['objectId']
else:
raise RuntimeError("Taking 'PutObject' action with no held inventory object")
action['objectId'] = inv_obj
# open the receptacle if needed
parent_recep = game_util.get_object(action['receptacleObjectId'], self.env.last_event.metadata)
if parent_recep is not None and parent_recep['objectType'] in constants.OPENABLE_CLASS_SET:
self.open_recep(parent_recep) # stores LL action
# Open the parent receptacle of the movable receptacle target.
elif parent_recep['objectType'] in constants.MOVABLE_RECEPTACLES_SET:
movable_parent_recep_ids = parent_recep['parentReceptacles']
if movable_parent_recep_ids is not None and len(movable_parent_recep_ids) > 0:
print(parent_recep['objectId'], movable_parent_recep_ids) # DEBUG
movable_parent_recep = game_util.get_object(movable_parent_recep_ids[0],
self.env.last_event.metadata)
if movable_parent_recep['objectType'] in constants.OPENABLE_CLASS_SET:
self.open_recep(movable_parent_recep) # stores LL action
# put the object
put_action = dict(action=action['action'],
objectId=action['objectId'],
receptacleObjectId=action['receptacleObjectId'],
forceAction=True,
placeStationary=True)
self.store_ll_action(put_action)
self.save_act_image(put_action, dir=constants.BEFORE)
self.event = self.env.step(put_action)
self.save_act_image(put_action, dir=constants.AFTER)
self.check_obj_visibility(action)
self.check_action_success(self.event)
# close the receptacle if needed
if parent_recep is not None:
parent_recep = game_util.get_object(parent_recep['objectId'], self.env.last_event.metadata)
self.close_recep(parent_recep) # stores LL action
elif 'CleanObject' in action['action']:
# put the object in the sink
sink_obj_id = self.get_some_visible_obj_of_name('SinkBasin')['objectId']
inv_obj = self.env.last_event.metadata['inventoryObjects'][0]
put_action = dict(action='PutObject',
objectId=inv_obj['objectId'],
receptacleObjectId=sink_obj_id,
forceAction=True,
placeStationary=True)
self.store_ll_action(put_action)
self.save_act_image(put_action, dir=constants.BEFORE)
self.event = self.env.step(put_action)
self.save_act_image(put_action, dir=constants.AFTER)
self.check_obj_visibility(put_action)
self.check_action_success(self.event)
# turn on the tap
clean_action = copy.deepcopy(action)
clean_action['action'] = 'ToggleObjectOn'
clean_action['objectId'] = game_util.get_obj_of_type_closest_to_obj(
"Faucet", inv_obj['objectId'], self.env.last_event.metadata)['objectId']
self.store_ll_action(clean_action)
self.save_act_image(clean_action, dir=constants.BEFORE)
self.event = self.env.step({k: clean_action[k]
for k in ['action', 'objectId']})
self.save_act_image(clean_action, dir=constants.AFTER)
self.check_action_success(self.event)
# Need to delay one frame to let `isDirty` update on stream-affected.
self.env.noop()
# Call built-in 'CleanObject' THOR action on every object in the SinkBasin.
# This means we clean everything in the sink, rather than just the things that happen to touch
# the water stream, which is the default simulator behavior but looks weird for our purposes.
sink_basin_obj = game_util.get_obj_of_type_closest_to_obj(
"SinkBasin", clean_action['objectId'], self.env.last_event.metadata)
for in_sink_obj_id in sink_basin_obj['receptacleObjectIds']:
if (game_util.get_object(in_sink_obj_id, self.env.last_event.metadata)['dirtyable']
and game_util.get_object(in_sink_obj_id, self.env.last_event.metadata)['isDirty']):
self.event = self.env.step({'action': 'CleanObject',
'objectId': in_sink_obj_id})
# turn off the tap
close_action = copy.deepcopy(clean_action)
close_action['action'] = 'ToggleObjectOff'
self.store_ll_action(close_action)
self.save_act_image(close_action, dir=constants.BEFORE)
self.event = self.env.step({k: close_action[k]
for k in ['action', 'objectId']})
self.save_act_image(action, dir=constants.AFTER)
self.check_action_success(self.event)
# pick up the object from the sink
pickup_action = dict(action='PickupObject',
objectId=inv_obj['objectId'])
self.store_ll_action(pickup_action)
self.save_act_image(pickup_action, dir=constants.BEFORE)
self.event = self.env.step(pickup_action)
self.save_act_image(pickup_action, dir=constants.AFTER)
self.check_obj_visibility(pickup_action)
self.check_action_success(self.event)
elif 'HeatObject' in action['action']:
# open the microwave
microwave_obj_id = self.get_some_visible_obj_of_name('Microwave')['objectId']
microwave_obj = game_util.get_object(microwave_obj_id, self.env.last_event.metadata)
self.open_recep(microwave_obj)
# put the object in the microwave
inv_obj = self.env.last_event.metadata['inventoryObjects'][0]
put_action = dict(action='PutObject',
objectId=inv_obj['objectId'],
receptacleObjectId=microwave_obj_id,
forceAction=True,
placeStationary=True)
self.store_ll_action(put_action)
self.save_act_image(put_action, dir=constants.BEFORE)
self.event = self.env.step(put_action)
self.save_act_image(put_action, dir=constants.AFTER)
self.check_obj_visibility(put_action)
self.check_action_success(self.event)
# close the microwave
microwave_obj = game_util.get_object(microwave_obj_id, self.env.last_event.metadata)
self.close_recep(microwave_obj)
# turn on the microwave
heat_action = copy.deepcopy(action)
heat_action['action'] = 'ToggleObjectOn'
heat_action['objectId'] = microwave_obj_id
self.store_ll_action(heat_action)
self.save_act_image(heat_action, dir=constants.BEFORE)
self.event = self.env.step({k: heat_action[k]
for k in ['action', 'objectId']})
self.save_act_image(heat_action, dir=constants.AFTER)
# turn off the microwave
stop_action = copy.deepcopy(heat_action)
stop_action['action'] = 'ToggleObjectOff'
self.store_ll_action(stop_action)
self.save_act_image(stop_action, dir=constants.BEFORE)
self.event = self.env.step({k: stop_action[k]
for k in ['action', 'objectId']})
self.save_act_image(stop_action, dir=constants.AFTER)
# open the microwave
microwave_obj = game_util.get_object(microwave_obj_id, self.env.last_event.metadata)
self.open_recep(microwave_obj)
# pick up the object from the microwave
pickup_action = dict(action='PickupObject',
objectId=inv_obj['objectId'])
self.store_ll_action(pickup_action)
self.save_act_image(pickup_action, dir=constants.BEFORE)
self.event = self.env.step(pickup_action)
self.save_act_image(pickup_action, dir=constants.AFTER)
self.check_obj_visibility(pickup_action)
self.check_action_success(self.event)
# close the microwave again
microwave_obj = game_util.get_object(microwave_obj_id, self.env.last_event.metadata)
self.close_recep(microwave_obj)
elif 'CoolObject' in action['action']:
# open the fridge
fridge_obj_id = self.get_some_visible_obj_of_name('Fridge')['objectId']
fridge_obj = game_util.get_object(fridge_obj_id, self.env.last_event.metadata)
self.open_recep(fridge_obj)
# put the object in the fridge
inv_obj = self.env.last_event.metadata['inventoryObjects'][0]
put_action = dict(action='PutObject',
objectId=inv_obj['objectId'],
receptacleObjectId=fridge_obj_id,
forceAction=True,
placeStationary=True)
self.store_ll_action(put_action)
self.save_act_image(put_action, dir=constants.BEFORE)
self.event = self.env.step(put_action)
self.save_act_image(put_action, dir=constants.AFTER)
self.check_obj_visibility(put_action)
self.check_action_success(self.event)
# close and cool the object inside the frige
cool_action = dict(action='CloseObject',
objectId=action['objectId'])
self.store_ll_action(cool_action)
self.save_act_image(action, dir=constants.BEFORE)
self.event = self.env.step(cool_action)
self.save_act_image(action, dir=constants.MIDDLE)
self.save_act_image(action, dir=constants.AFTER)
# open the fridge again
fridge_obj = game_util.get_object(fridge_obj_id, self.env.last_event.metadata)
self.open_recep(fridge_obj)
# pick up the object from the fridge
pickup_action = dict(action='PickupObject',
objectId=inv_obj['objectId'])
self.store_ll_action(pickup_action)
self.save_act_image(pickup_action, dir=constants.BEFORE)
self.event = self.env.step(pickup_action)
self.save_act_image(pickup_action, dir=constants.AFTER)
self.check_obj_visibility(pickup_action)
self.check_action_success(self.event)
# close the fridge again
fridge_obj = game_util.get_object(fridge_obj_id, self.env.last_event.metadata)
self.close_recep(fridge_obj)
elif 'ToggleObject' in action['action']:
on_action = dict(action=action['action'],
objectId=action['objectId'])
toggle_obj = game_util.get_object(action['objectId'], self.env.last_event.metadata)
on_action['action'] = 'ToggleObjectOff' if toggle_obj['isToggled'] else 'ToggleObjectOn'
self.store_ll_action(on_action)
self.save_act_image(on_action, dir=constants.BEFORE)
self.event = self.env.step(on_action)
self.save_act_image(on_action, dir=constants.AFTER)
self.check_action_success(self.event)
elif 'SliceObject' in action['action']:
# open the receptacle if needed
parent_recep = self.get_parent_receps(action['objectId'])
if parent_recep is not None and parent_recep['objectType'] in constants.OPENABLE_CLASS_SET:
self.open_recep(parent_recep)
# slice the object
slice_action = dict(action=action['action'],
objectId=action['objectId'])
self.store_ll_action(slice_action)
self.save_act_image(slice_action, dir=constants.BEFORE)
self.event = self.env.step(slice_action)
self.save_act_image(action, dir=constants.AFTER)
self.check_action_success(self.event)
# close the receptacle if needed
if parent_recep is not None:
parent_recep = game_util.get_object(parent_recep['objectId'], self.env.last_event.metadata)
self.close_recep(parent_recep) # stores LL action
else:
# check that the object to pick is visible in the camera frame
if action['action'] == 'PickupObject':
self.check_obj_visibility(action)
self.store_ll_action(action)
self.save_act_image(action, dir=constants.BEFORE)
self.event = self.env.step(action)
self.save_act_image(action, dir=constants.AFTER)
if action['action'] == 'PutObject':
self.check_obj_visibility(action)
else:
self.event = self.env.step(action)
new_pose = game_util.get_pose(self.event)
point_dists = np.sum(np.abs(self.gt_graph.points - np.array(new_pose)[:2]), axis=1)
if np.min(point_dists) > 0.0001:
print('Point teleport failure')
self.event = self.env.step({
'action': 'Teleport',
'x': start_pose[0] * constants.AGENT_STEP_SIZE,
'y': self.agent_height,
'z': start_pose[1] * constants.AGENT_STEP_SIZE,
'rotateOnTeleport': True,
'rotation': new_pose[2] * 90,
})
self.env.last_event.metadata['lastActionSuccess'] = False
self.timers[0, 0] += time.time() - t_start
self.timers[0, 1] += 1
if self.timers[0, 1] % 100 == 0:
print('env step time %.3f' % (self.timers[0, 0] / self.timers[0, 1]))
self.timers[0, :] = 0
if self.env.last_event.metadata['lastActionSuccess']:
if action['action'] == 'OpenObject':
self.currently_opened_object_ids.add(action['objectId'])
elif action['action'] == 'CloseObject':
self.currently_opened_object_ids.remove(action['objectId'])
elif action['action'] == 'PickupObject':
self.inventory_ids.add(action['objectId'])
elif action['action'] == 'PutObject':
self.inventory_ids.remove(action['objectId'])
if self.env.last_event.metadata['lastActionSuccess'] and process_frame:
self.process_frame()