in Luminous4Alfred/TaskParser/cdf_gene/utils.py [0:0]
def generate_cdf_from_task_by_gt(task, dataset_type="train", data_url="https://raw.githubusercontent.com/askforalfred/alfred/master/data/json_2.1.0/"):
'''
Generate task cdf from ai2thor 2.1.0 ground-truth json data
task: traj_json
state: ThorEnv.last_event
'''
json_path = os.path.join(data_url, dataset_type, task['task'], 'traj_data.json')
with urllib.request.urlopen(json_path) as url:
ex = json.loads(url.read().decode())
cdf = {
"task_desc": "",
"scene": {
"scene_type": [],
"agent_init": [],
"init_actions": [],
"required_objects": []
},
"script": [],
"task_goals": [],
"attributes":{},
"pddl_params": None,
}
# basic info
cdf["task_desc"] = task["task"]
floor_plan = ex['scene']['floor_plan']
cdf["attributes"]['floor_plan'] = floor_plan
floor_plan_id = int(floor_plan.split("Plan")[-1])
cdf['scene']['scene_type'].append(0 if floor_plan_id < 100 else 1 if floor_plan_id < 300 else 2 if floor_plan_id < 400 \
else 3)
cdf["pddl_params"] = ex["pddl_params"]
# if cdf['scene']['scene_type'][0] in [0,1,2]:
# if "cool" in task["task"] or "heat" in task["task"] or "clean" in task["task"]:
# print("buscar para task:", task)
# set goals and script
task_goal, script = get_task_goals_and_script(ex)
cdf["task_goals"].append(task_goal)
cdf["script"] = script
# get required objects and set up scripts to solve the problem
required_object_names = []
if ex["task_type"] == 'pick_and_place_simple':
how_many_pick_up = 0
for plan_step in ex["plan"]["high_pddl"]:
# get object
if plan_step["planner_action"]["action"] == "PickupObject":
how_many_pick_up += 1
obj_id_need_pickup = plan_step["planner_action"]["objectId"]
obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"
# get receptacle
obj_receptacle = None
# for obj in state.metadata["objects"]:
# if obj["objectId"] == obj_id_need_pickup:
# if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
# obj_receptacle_id = obj["parentReceptacles"][0]
# obj_receptacle = obj_receptacle_id.split("|")[0] + "_1"
if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"
if obj_receptacle != None:
cdf["scene"]["required_objects"].append({
"name":obj_receptacle,
})
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
"location": [{
obj_receptacle: "on",
}]
})
else:
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
})
# get target receptacle
elif plan_step["planner_action"]["action"] == "PutObject":
recept_id = plan_step["planner_action"]["receptacleObjectId"]
recept = recept_id.split("|")[0] + "_1"
cdf["scene"]["required_objects"].append({
"name":recept,
})
elif ex["task_type"] == 'pick_and_place_with_movable_recep':
how_many_pick_up = 0
for plan_step in ex["plan"]["high_pddl"]:
# get object
if plan_step["planner_action"]["action"] == "PickupObject":
how_many_pick_up += 1
obj_id_need_pickup = plan_step["planner_action"]["objectId"]
obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"
# get receptacle
obj_receptacle = None
# for obj in state.metadata["objects"]:
# if obj["objectId"] == obj_id_need_pickup:
# if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
# obj_receptacle_id = obj["parentReceptacles"][0]
# obj_receptacle = obj_receptacle_id.split("|")[0] + "_1"
if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"
if obj_receptacle != None:
alread_in = False
for alread_in_obj in cdf["scene"]["required_objects"]:
if alread_in_obj["name"] == obj_receptacle:
alread_in = True
break
if not alread_in:
cdf["scene"]["required_objects"].append({
"name":obj_receptacle,
})
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
"location": [{
obj_receptacle: "on",
}]
})
else:
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
})
# get target receptacle
elif plan_step["planner_action"]["action"] == "PutObject":
hand_obj = plan_step["planner_action"]["coordinateObjectId"][0] + "_1"
say_hand = False
for alread_in_obj in cdf["scene"]["required_objects"]:
if alread_in_obj["name"] == hand_obj:
say_hand = True
break
if say_hand:
continue
recept_id = plan_step["planner_action"]["receptacleObjectId"]
recept = recept_id.split("|")[0] + "_1"
alread_in = False
for alread_in_obj in cdf["scene"]["required_objects"]:
if alread_in_obj["name"] == recept:
alread_in = True
break
if not alread_in:
cdf["scene"]["required_objects"].append({
"name":recept,
})
elif ex["task_type"] == 'pick_two_obj_and_place':
how_many_pick_up = 0
obj_receptacle = None
for plan_step in ex["plan"]["high_pddl"]:
# get object
if plan_step["planner_action"]["action"] == "PickupObject" and how_many_pick_up == 0:
how_many_pick_up += 1
obj_id_need_pickup = plan_step["planner_action"]["objectId"]
obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"
# get receptacle
# for obj in state.metadata["objects"]:
# if obj["objectId"] == obj_id_need_pickup:
# if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
# obj_receptacle_id = obj["parentReceptacles"][0]
# obj_receptacle = obj_receptacle_id.split("|")[0] + "_1"
if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"
if obj_receptacle != None:
cdf["scene"]["required_objects"].append({
"name":obj_receptacle,
})
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
"location": [{
obj_receptacle: "on",
}]
})
else:
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
})
elif plan_step["planner_action"]["action"] == "PickupObject" and how_many_pick_up == 1:
how_many_pick_up += 1
obj_id_need_pickup = plan_step["planner_action"]["objectId"]
obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_2"
# get receptacle
obj_receptacle2 = None
# for obj in state.metadata["objects"]:
# if obj["objectId"] == obj_id_need_pickup:
# if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
# obj_receptacle_id = obj["parentReceptacles"][0]
# obj_receptacle2 = obj_receptacle_id.split("|")[0] + "_1"
if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
obj_receptacle2 = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"
if obj_receptacle2 != None:
if obj_receptacle2 != obj_receptacle:
cdf["scene"]["required_objects"].append({
"name":obj_receptacle2,
})
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
"location": [{
obj_receptacle2: "on",
}]
})
else:
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
})
# get target receptacle
elif plan_step["planner_action"]["action"] == "PutObject" and how_many_pick_up == 1:
recept_id = plan_step["planner_action"]["receptacleObjectId"]
recept = recept_id.split("|")[0] + "_1"
alread_in = False
for alread_in_obj in cdf["scene"]["required_objects"]:
if alread_in_obj["name"] == recept:
alread_in = True
break
if not alread_in:
cdf["scene"]["required_objects"].append({
"name":recept,
})
elif ex["task_type"] == 'look_at_obj_in_light':
for plan_step in ex["plan"]["high_pddl"]:
# get object
if plan_step["planner_action"]["action"] == "PickupObject":
obj_id_need_pickup = plan_step["planner_action"]["objectId"]
obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"
# get receptacle
obj_receptacle = None
# for obj in state.metadata["objects"]:
# if obj["objectId"] == obj_id_need_pickup:
# if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
# obj_receptacle_id = obj["parentReceptacles"][0]
# obj_receptacle = obj_receptacle_id.split("|")[0] + "_1"
if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"
if obj_receptacle != None:
cdf["scene"]["required_objects"].append({
"name":obj_receptacle,
})
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
"location": [{
obj_receptacle: "on",
}]
})
else:
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
})
# get target receptacle
elif plan_step["planner_action"]["action"] == "ToggleObject":
turnon_name = plan_step["planner_action"]["coordinateObjectId"][0]
turnon_obj = turnon_name + "_1"
if turnon_name == "DeskLamp":
cdf["scene"]["required_objects"].append({
"name":"Desk_1",
})
cdf["scene"]["required_objects"].append({
"name":turnon_obj,
"location": [{
"Desk_1": "on",
}]
})
else:
cdf["scene"]["required_objects"].append({
"name":turnon_obj,
})
elif ex["task_type"] in ['pick_cool_then_place_in_recep', "pick_clean_then_place_in_recep", "pick_heat_then_place_in_recep"]:
how_many_pick_up = 0
for plan_step in ex["plan"]["high_pddl"]:
# get object
if plan_step["planner_action"]["action"] == "PickupObject":
how_many_pick_up += 1
obj_id_need_pickup = plan_step["planner_action"]["objectId"]
obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"
# get receptacle
obj_receptacle = None
if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"
if obj_receptacle != None:
alread_in = False
for alread_in_obj in cdf["scene"]["required_objects"]:
if alread_in_obj["name"] == obj_receptacle:
alread_in = True
break
if not alread_in:
cdf["scene"]["required_objects"].append({
"name":obj_receptacle,
})
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
"location": [{
obj_receptacle: "on",
}]
})
else:
cdf["scene"]["required_objects"].append({
"name":obj_need_pickup,
})
# get target receptacle
elif plan_step["planner_action"]["action"] in ["PutObject", "HeatObject"]:
recept_id = plan_step["planner_action"]["coordinateReceptacleObjectId"]
recept = recept_id[0] + "_1"
alread_in = False
for alread_in_obj in cdf["scene"]["required_objects"]:
if alread_in_obj["name"] == recept:
alread_in = True
break
if not alread_in:
cdf["scene"]["required_objects"].append({
"name":recept,
})
return cdf, ex, required_object_names