def generate_cdf_from_task_by_gt()

in Luminous4Alfred/TaskParser/cdf_gene/utils.py [0:0]


def generate_cdf_from_task_by_gt(task, dataset_type="train", data_url="https://raw.githubusercontent.com/askforalfred/alfred/master/data/json_2.1.0/"):
    '''
    Generate task cdf from ai2thor 2.1.0 ground-truth json data
    task: traj_json
    state: ThorEnv.last_event
    '''
    
    json_path = os.path.join(data_url, dataset_type, task['task'], 'traj_data.json')

    with urllib.request.urlopen(json_path) as url:
        ex = json.loads(url.read().decode())

    cdf = {
        "task_desc": "",
        "scene": {
            "scene_type": [],
            "agent_init": [],
            "init_actions": [],
            "required_objects": []
        },
        "script": [],
        "task_goals": [],
        "attributes":{},
        "pddl_params": None,
    }
    # basic info
    cdf["task_desc"] = task["task"]
    floor_plan = ex['scene']['floor_plan']
    cdf["attributes"]['floor_plan'] = floor_plan
    floor_plan_id = int(floor_plan.split("Plan")[-1])
    cdf['scene']['scene_type'].append(0 if floor_plan_id < 100 else 1 if floor_plan_id < 300 else 2 if floor_plan_id < 400 \
        else 3)
    
    cdf["pddl_params"] = ex["pddl_params"]
    
    # if cdf['scene']['scene_type'][0] in [0,1,2]:
    #     if "cool" in task["task"] or "heat" in task["task"] or "clean" in task["task"]:
    #         print("buscar para task:", task)
    
    # set goals and script
    task_goal, script = get_task_goals_and_script(ex)
    cdf["task_goals"].append(task_goal)
    cdf["script"] = script
    
    # get required objects and set up scripts to solve the problem
    required_object_names = []
    
    if ex["task_type"] == 'pick_and_place_simple':
        how_many_pick_up = 0
        for plan_step in ex["plan"]["high_pddl"]:
            # get object
            if plan_step["planner_action"]["action"] == "PickupObject":
                how_many_pick_up += 1
                obj_id_need_pickup = plan_step["planner_action"]["objectId"]
                obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"

                # get receptacle
                obj_receptacle = None
                # for obj in state.metadata["objects"]:
                #     if obj["objectId"] == obj_id_need_pickup:
                #         if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
                #             obj_receptacle_id = obj["parentReceptacles"][0]
                #             obj_receptacle = obj_receptacle_id.split("|")[0] + "_1"
                if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
                    obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"   
                
                if obj_receptacle != None:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_receptacle,
                    })

                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                        "location": [{
                            obj_receptacle: "on",
                        }]
                    })
                else:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                    })

            # get target receptacle
            elif plan_step["planner_action"]["action"] == "PutObject":
                recept_id = plan_step["planner_action"]["receptacleObjectId"]
                recept = recept_id.split("|")[0] + "_1"
                cdf["scene"]["required_objects"].append({
                        "name":recept,
                    })

    elif ex["task_type"] == 'pick_and_place_with_movable_recep':
        how_many_pick_up = 0
        for plan_step in ex["plan"]["high_pddl"]:
            # get object
            if plan_step["planner_action"]["action"] == "PickupObject":
                how_many_pick_up += 1
                obj_id_need_pickup = plan_step["planner_action"]["objectId"]
                obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"

                # get receptacle
                obj_receptacle = None
                # for obj in state.metadata["objects"]:
                #     if obj["objectId"] == obj_id_need_pickup:
                #         if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
                #             obj_receptacle_id = obj["parentReceptacles"][0]
                #             obj_receptacle = obj_receptacle_id.split("|")[0] + "_1"
                if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
                    obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"   
                
                if obj_receptacle != None:
                    alread_in = False
                    for alread_in_obj in cdf["scene"]["required_objects"]:
                        if alread_in_obj["name"] == obj_receptacle:
                            alread_in = True
                            break
                    
                    if not alread_in:
                        cdf["scene"]["required_objects"].append({
                                "name":obj_receptacle,
                            })

                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                        "location": [{
                            obj_receptacle: "on",
                        }]
                    })
                else:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                    })

            # get target receptacle
            elif plan_step["planner_action"]["action"] == "PutObject":
                hand_obj = plan_step["planner_action"]["coordinateObjectId"][0] + "_1"

                say_hand = False
                for alread_in_obj in cdf["scene"]["required_objects"]:
                    if alread_in_obj["name"] == hand_obj:
                        say_hand = True
                        break
                
                if say_hand:
                    continue

                recept_id = plan_step["planner_action"]["receptacleObjectId"]
                recept = recept_id.split("|")[0] + "_1"
                alread_in = False
                for alread_in_obj in cdf["scene"]["required_objects"]:
                    if alread_in_obj["name"] == recept:
                        alread_in = True
                        break
                
                if not alread_in:
                    cdf["scene"]["required_objects"].append({
                            "name":recept,
                        })

    elif ex["task_type"] == 'pick_two_obj_and_place':
        how_many_pick_up = 0
        obj_receptacle = None
        for plan_step in ex["plan"]["high_pddl"]:
            # get object
            if plan_step["planner_action"]["action"] == "PickupObject" and how_many_pick_up == 0:
                how_many_pick_up += 1
                obj_id_need_pickup = plan_step["planner_action"]["objectId"]
                obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"

                # get receptacle
                # for obj in state.metadata["objects"]:
                #     if obj["objectId"] == obj_id_need_pickup:
                #         if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
                #             obj_receptacle_id = obj["parentReceptacles"][0]
                #             obj_receptacle = obj_receptacle_id.split("|")[0] + "_1"
                if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
                    obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"   
                
                if obj_receptacle != None:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_receptacle,
                    })

                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                        "location": [{
                            obj_receptacle: "on",
                        }]
                    })
                else:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                    })
            elif plan_step["planner_action"]["action"] == "PickupObject" and how_many_pick_up == 1:
                how_many_pick_up += 1
                obj_id_need_pickup = plan_step["planner_action"]["objectId"]
                obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_2"

                # get receptacle
                obj_receptacle2 = None
                # for obj in state.metadata["objects"]:
                #     if obj["objectId"] == obj_id_need_pickup:
                #         if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
                #             obj_receptacle_id = obj["parentReceptacles"][0]
                #             obj_receptacle2 = obj_receptacle_id.split("|")[0] + "_1"
                if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
                    obj_receptacle2 = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"   
                
                if obj_receptacle2 != None:
                    if obj_receptacle2 != obj_receptacle:
                        cdf["scene"]["required_objects"].append({
                            "name":obj_receptacle2,
                        })

                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                        "location": [{
                            obj_receptacle2: "on",
                        }]
                    })
                else:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                    })


            # get target receptacle
            elif plan_step["planner_action"]["action"] == "PutObject" and how_many_pick_up == 1:
                recept_id = plan_step["planner_action"]["receptacleObjectId"]
                recept = recept_id.split("|")[0] + "_1"
                alread_in = False
                for alread_in_obj in cdf["scene"]["required_objects"]:
                    if alread_in_obj["name"] == recept:
                        alread_in = True
                        break
                
                if not alread_in:
                    cdf["scene"]["required_objects"].append({
                            "name":recept,
                        })

    elif ex["task_type"] == 'look_at_obj_in_light':
        for plan_step in ex["plan"]["high_pddl"]:
            # get object
            if plan_step["planner_action"]["action"] == "PickupObject":
                obj_id_need_pickup = plan_step["planner_action"]["objectId"]
                obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"

                # get receptacle
                obj_receptacle = None
                # for obj in state.metadata["objects"]:
                #     if obj["objectId"] == obj_id_need_pickup:
                #         if obj["parentReceptacles"] is not None and len(obj["parentReceptacles"]) > 0:
                #             obj_receptacle_id = obj["parentReceptacles"][0]
                #             obj_receptacle = obj_receptacle_id.split("|")[0] + "_1"
                if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
                    obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"   
                
                if obj_receptacle != None:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_receptacle,
                    })

                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                        "location": [{
                            obj_receptacle: "on",
                        }]
                    })
                else:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                    })

            # get target receptacle
            elif plan_step["planner_action"]["action"] == "ToggleObject":
                turnon_name = plan_step["planner_action"]["coordinateObjectId"][0]
                turnon_obj = turnon_name + "_1"
                if turnon_name == "DeskLamp":
                    cdf["scene"]["required_objects"].append({
                            "name":"Desk_1",
                        })
                    cdf["scene"]["required_objects"].append({
                        "name":turnon_obj,
                        "location": [{
                            "Desk_1": "on",
                        }]
                    })
                    
                else:    
                    cdf["scene"]["required_objects"].append({
                            "name":turnon_obj,
                        })

    elif ex["task_type"] in ['pick_cool_then_place_in_recep', "pick_clean_then_place_in_recep", "pick_heat_then_place_in_recep"]:
        how_many_pick_up = 0
        for plan_step in ex["plan"]["high_pddl"]:
            # get object
            if plan_step["planner_action"]["action"] == "PickupObject":
                how_many_pick_up += 1
                obj_id_need_pickup = plan_step["planner_action"]["objectId"]
                obj_need_pickup = obj_id_need_pickup.split("|")[0] + "_1"

                # get receptacle
                obj_receptacle = None
                if "coordinateReceptacleObjectId" in plan_step["planner_action"] and plan_step["planner_action"]["coordinateReceptacleObjectId"] is not None:
                    obj_receptacle = plan_step["planner_action"]["coordinateReceptacleObjectId"][0] + "_1"           
                
                if obj_receptacle != None:
                    alread_in = False
                    for alread_in_obj in cdf["scene"]["required_objects"]:
                        if alread_in_obj["name"] == obj_receptacle:
                            alread_in = True
                            break
                    
                    if not alread_in:
                        cdf["scene"]["required_objects"].append({
                                "name":obj_receptacle,
                            })

                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                        "location": [{
                            obj_receptacle: "on",
                        }]
                    })
                else:
                    cdf["scene"]["required_objects"].append({
                        "name":obj_need_pickup,
                    })

            # get target receptacle
            elif plan_step["planner_action"]["action"] in ["PutObject", "HeatObject"]:
                recept_id = plan_step["planner_action"]["coordinateReceptacleObjectId"]
                recept = recept_id[0] + "_1"

                alread_in = False
                for alread_in_obj in cdf["scene"]["required_objects"]:
                    if alread_in_obj["name"] == recept:
                        alread_in = True
                        break
                
                if not alread_in:
                    cdf["scene"]["required_objects"].append({
                            "name":recept,
                        })

                

    return cdf, ex, required_object_names