in scripts/generate_landmarks.py [0:0]
def get_instance(self, item, env):
traj_data = self.trajectories[item]
json_file = traj_data["path"]
# make directories
dest_file = json_file.replace(TRAJ_DATA_JSON_FILENAME, self.args.ref_traj_file)
# scene setup
scene_num = traj_data['scene']['scene_num']
object_poses = traj_data['scene']['object_poses']
object_toggles = traj_data['scene']['object_toggles']
dirty_and_empty = traj_data['scene']['dirty_and_empty']
# reset
scene_name = 'FloorPlan%d' % scene_num
env.reset(scene_name)
env.restore_scene(object_poses, object_toggles, dirty_and_empty)
env.step(dict(traj_data['scene']['init_action']))
# setup task
env.set_task(traj_data, self.args, reward_type='dense')
low_actions = traj_data['plan']['low_actions']
num_actions = len(low_actions)
missing = []
ref_traj_data = copy.deepcopy(traj_data)
events = []
# we first replay the entire trajectory
for ll_idx, ll_action in enumerate(low_actions):
# next cmd under the current hl_action
cmd = ll_action['api_action']
hl_action = traj_data["plan"]['high_pddl'][low_actions[ll_idx]["high_idx"]]
is_navigation = hl_action["discrete_action"]["action"] == "GotoLocation"
# we extract panorama info only for navigation instructions
events.append(_get_panorama_events(env, is_navigation))
cmd = {k: cmd[k] for k in
['action', 'objectId', 'receptacleObjectId', 'placeStationary', 'forceAction'] if
k in cmd}
env.step(cmd)
# events.append(self._refine_event(event))
subgoal_start = 0
# at this point we can trace the elements of the trajectory
for ll_idx, ll_action in enumerate(low_actions):
hl_action = traj_data["plan"]['high_pddl'][low_actions[ll_idx]["high_idx"]]
# we are at the end of a subgoal
# we need to go back and refine all the previous steps
if ll_idx == num_actions - 1 or low_actions[ll_idx]["high_idx"] != low_actions[ll_idx + 1]["high_idx"]:
object_id = None
receptacle_id = None
# we go through each event object for the current timestep
for main_event in events[ll_idx]:
# if we are dealing with a GotoLocation subgoal we want to make sure to extract the bounding box of the
# landmark object
if hl_action["discrete_action"]["action"] == "GotoLocation":
# First we check that the next instruction doesn't involve an open. If it's the case we execute it
# so that we can actually see the object
if low_actions[ll_idx + 1]["api_action"]["action"] == "OpenObject":
# the landmark is the object that we want to open and not the one that is inside (it's not visible!)
open_action = low_actions[ll_idx + 1]
object_id = open_action["api_action"]["objectId"]
else:
landmark_name = hl_action["discrete_action"]["args"][0]
for obj_id in main_event.instance_detections2D.keys():
if landmark_name in obj_id.lower():
object_id = obj_id
break
if object_id is not None:
receptacle_id = _get_receptacle(object_id, main_event)
elif hl_action["discrete_action"]["action"] == "HeatObject" or hl_action["discrete_action"][
"action"] == "CoolObject":
# HeatObject high-level instructions are somehow wonky. We need to hack them.
# So we assume that an HeatObject high-level instruction will have a PutObject low-level instruction
# In this way we can derive the objectId and receptacleId from it. So we go back until we find it
for i in range(ll_idx, 0, -1):
if low_actions[i]["api_action"]["action"] == "PutObject":
object_id = low_actions[i]["api_action"]["objectId"]
if "receptacleObjectId" in low_actions[i]["api_action"]:
receptacle_id = low_actions[i]["api_action"]["receptacleObjectId"]
else:
receptacle_id = _get_receptacle(object_id, main_event)
break
elif hl_action["discrete_action"]["action"] in {"PutObject"}:
# for these actions the API requires only the receptacle
# so we assume it is the object of this action -- counter-intuitive though...
if "receptacleObjectId" in hl_action["planner_action"]:
object_id = hl_action["planner_action"]["receptacleObjectId"]
else:
act_object_id = ll_action["api_action"]["objectId"]
object_id = _get_receptacle(act_object_id, main_event)
receptacle_id = _get_receptacle(object_id, main_event)
else:
object_id = hl_action["planner_action"]["objectId"]
receptacle_id = _get_receptacle(object_id, main_event)
if object_id is not None:
break
if object_id is None and receptacle_id is None:
missing.append({"action": ll_action, "traj": json_file})
ref_low_actions = ref_traj_data["plan"]["low_actions"]
for subgoal_idx in range(subgoal_start, ll_idx + 1):
object_box = None
object_mask = None
receptacle_box = None
receptacle_mask = None
for event in events[subgoal_idx]:
object_box = event.instance_detections2D.get(object_id, object_box)
object_mask = event.instance_masks.get(object_id, object_mask)
receptacle_box = event.instance_detections2D.get(receptacle_id, receptacle_box)
receptacle_mask = event.instance_masks.get(receptacle_id, receptacle_mask)
ref_low_actions[subgoal_idx]["subgoal"] = dict()
if ref_low_actions[subgoal_idx]["api_action"]["action"] == "PutObject":
if receptacle_id is not None and receptacle_box is not None:
# this is for actions like open and close that work on the receptacle object
ref_low_actions[subgoal_idx]["subgoal"]["object"] = dict(
bbox=receptacle_box.tolist(),
mask=base64.b64encode(receptacle_mask).decode("utf-8"),
mask_shape=list(receptacle_mask.shape),
id=receptacle_id
)
else:
if ref_low_actions[subgoal_idx]["api_action"].get("objectId") is not None and \
ref_low_actions[subgoal_idx]["api_action"]["objectId"] == object_id:
if object_box is not None:
# in this case we have a manipulation action that requires the object as reference
# if we have a receptacle we can specify it here as well
ref_low_actions[subgoal_idx]["subgoal"]["object"] = dict(
bbox=object_box.tolist(),
mask=base64.b64encode(object_mask).decode("utf-8"),
mask_shape=list(object_mask.shape),
id=object_id
)
if receptacle_id is not None:
if ref_low_actions[subgoal_idx]["api_action"].get("receptacleObjectId") is not None:
assert ref_low_actions[subgoal_idx]["api_action"][
"receptacleObjectId"] == receptacle_id
if receptacle_box is not None:
ref_low_actions[subgoal_idx]["subgoal"]["receptacle"] = dict(
bbox=receptacle_box.tolist(),
mask=base64.b64encode(receptacle_mask).decode("utf-8"),
mask_shape=list(receptacle_mask.shape),
id=receptacle_id
)
elif receptacle_id is not None and ref_low_actions[subgoal_idx]["api_action"].get(
"objectId") is not None \
and ref_low_actions[subgoal_idx]["api_action"]["objectId"] == receptacle_id:
if receptacle_box is not None:
# this is for actions like open and close that work on the receptacle object
ref_low_actions[subgoal_idx]["subgoal"]["object"] = dict(
bbox=receptacle_box.tolist(),
mask=base64.b64encode(receptacle_mask).decode("utf-8"),
mask_shape=list(receptacle_mask.shape),
id=receptacle_id
)
# usually they do not have a receptacle so we ignore it here
else:
# in this last case we have a navigation subgoal for which we just set objectId and receptacleId
# as they are!
if object_box is not None:
ref_low_actions[subgoal_idx]["subgoal"]["object"] = dict(
bbox=object_box.tolist(),
mask=base64.b64encode(object_mask).decode("utf-8"),
mask_shape=list(object_mask.shape),
id=object_id
)
if receptacle_box is not None:
ref_low_actions[subgoal_idx]["subgoal"]["receptacle"] = dict(
bbox=receptacle_box.tolist(),
mask=base64.b64encode(receptacle_mask).decode("utf-8"),
mask_shape=list(receptacle_mask.shape),
id=receptacle_id
)
subgoal_start = ll_idx + 1
return {"file": dest_file, "traj": ref_traj_data, "missing": missing}