def update_scripts_from_content_set()

in mrs_plugin/lib/content_sets.py [0:0]


def update_scripts_from_content_set(session, content_set_id, language, content_dir=None, ignore_list=None,
                                    send_gui_message=None):
    if send_gui_message is None:
        send_gui_message = print_gui_message

    content_set = get_content_set(
        session=session, content_set_id=content_set_id)
    if content_set is None:
        raise ValueError(
            "Could load the MRS scripts. The given content set was not found.")

    service = services.get_service(
        session=session, service_id=content_set["service_id"])
    if service is None:
        raise ValueError(
            "Could load the MRS scripts. The content set's service was not found.")

    code_files = []
    static_content_folders = []
    if content_dir is not None:
        code_files, build_folder, static_content_folders = get_code_files_from_folder(
            path=content_dir, ignore_list=ignore_list, language=language)
    else:
        files = content_files.get_content_files(
            session=session, content_set_id=content_set_id,
            include_enable_state=True, include_file_content=True)

        for content_file in files:
            dirs = pathlib.Path(content_file["request_path"])

            if len(dirs.parts) > 0:
                if is_common_build_folder(dirs.parts[0]):
                    build_folder = dirs.parts[0]
                elif len(dirs.parts) > 1 and is_common_build_folder(dirs.parts[1]):
                    build_folder = dirs.parts[1]

                if is_common_static_content_folder(dirs.parts[0]):
                    static_content_folders.append(dirs.parts[0])
                elif len(dirs.parts) > 1 and is_common_static_content_folder(dirs.parts[1]):
                    static_content_folders.append(dirs.parts[1])

            fullname = "." + \
                content_file["content_set_request_path"] + \
                content_file["request_path"]

            # Progress TypeScript
            if language == "TypeScript" and (
                (fullname.endswith(".mts") or fullname.endswith(".ts"))
                and not (fullname.endswith(".spec.mts")
                         or fullname.endswith(".spec.ts")
                         or fullname.endswith(".d.ts"))):

                if core.is_text(content_file["content"]):
                    code = content_file["content"].decode()
                else:
                    raise ValueError(f"The content of file {
                                     fullname} is binary data, not text.")

                # Clear TypeScript comments and strings for regex matching
                code_cleared = blank_quoted_js_strings(
                    blank_js_comments(code))

                options = content_file.get("options")
                last_modification = ""
                if options is not None:
                    last_modification = options.get("last_modification", "")

                code_files.append({
                    "full_file_name": fullname,
                    "relative_file_name": fullname[len("." + content_file["content_set_request_path"]):],
                    "file_name": os.path.basename(fullname),
                    "last_modification": last_modification,
                    "code": code,
                    "code_cleared": code_cleared,
                })

    if language == "TypeScript" and build_folder is None:
        raise Exception(
            "The projects build directory was not found. Please build the project before adding the content set.")

    if len(code_files) == 0:
        if send_gui_message is not None:
            send_gui_message(
                "info", f"None of the files matches the specified MRS scripting language.")
        return

    if send_gui_message is not None:
        send_gui_message(
            "info", f"Parsing {len(code_files)} MRS Script files ...")

    script_def = get_mrs_script_definitions_from_code_file_list(
        code_files, language=language, send_gui_message=send_gui_message)
    if build_folder:
        script_def["build_folder"] = build_folder
    if len(static_content_folders) > 0:
        script_def["static_content_folders"] = static_content_folders

    if language == "TypeScript" and build_folder is None:
        script_def["errors"].append({
            "kind": "BuildError",
            "message": f"No build folder found for this TypeScript project. Please build the project before adding it.",
        })

    error_count = len(script_def["errors"])
    if error_count > 0:
        return script_def

    script_module_files = []
    for script_module in script_def["script_modules"]:
        properties = script_module["properties"]

        # Add script_module_files information for this module
        outputFilePath = get_mrs_script_property(properties, "outputFilePath")
        # If an explicit outputFilePath has been given, ensure that / are used and the path starts with /
        if outputFilePath is not None:
            file_to_load = outputFilePath
            if file_to_load.count("\\") > 0 and file_to_load.count("/") == 0:
                file_to_load = file_to_load.replace("\\", "/")
            if not file_to_load.startswith("/"):
                file_to_load = "/" + file_to_load
        else:
            # Otherwise, use /<build_folder>/<file_name>
            file_to_load = "/" + build_folder + "/" + \
                script_module["file_info"]["file_name"]
            if language == "TypeScript":
                file_to_load = file_to_load.replace(
                    ".mts", ".mjs").replace(".ts", ".js")
        script_module_files.append({
            "file_info": script_module["file_info"],
            "file_to_load": file_to_load,
            "class_name": script_module["class_name"],
        })

        request_path = get_mrs_script_property(
            properties, "requestPath", "/" + script_module["class_name"])

        schema = schemas.get_schema(
            session=session, request_path=request_path)
        if schema is None:
            name = get_mrs_script_property(
                properties, "name", core.convert_path_to_camel_case(request_path))

            send_gui_message(
                "info", f"Creating new REST schema `{name}` at {request_path} ...")

            # Add the grants for the given module to the options
            options = get_mrs_script_property(properties, "options", None)
            grants = get_mrs_script_property(properties, "grants", None)
            if grants is not None:
                if options is None:
                    options = {}
                options["grants"] = grants

            schema_id = schemas.add_schema(
                session=session, schema_name=name,
                service_id=content_set["service_id"], request_path=request_path,
                enabled=get_mrs_script_property(properties, "enabled", True),
                internal=get_mrs_script_property(properties, "internal", True),
                requires_auth=get_mrs_script_property(
                    properties, "requiresAuth", False),
                options=options,
                metadata=get_mrs_script_property(properties, "metadata", None),
                comments=get_mrs_script_property(properties, "comments", None),
                schema_type="SCRIPT_MODULE",
            )

            schema = schemas.get_schema(session=session, schema_id=schema_id)

        send_gui_message(
            "info", f"Adding MRS scripts to REST schema {schema.get("name")} at {request_path} ...")

        # Add a db_object for each script
        interface_list = script_def["interfaces"]
        for script in script_module["scripts"]:
            func_props = script["properties"]
            func_request_path = get_mrs_script_property(
                func_props, "requestPath", "/" + script["function_name"])
            full_path = service["url_context_root"] + \
                schema["request_path"] + func_request_path
            row_ownership_param = get_mrs_script_property(
                func_props, "rowOwnershipParameter")
            row_ownership_field_id = None

            send_gui_message(
                "info", f"Adding MRS script {script["function_name"]} at {func_request_path} ...")

            db_object_id = core.get_sequence_id(session=session)
            objects = []

            # Build parameters object with all parameter as object_fields
            object_id = core.get_sequence_id(session=session)
            object_fields = []
            pos = 0
            for param in script["parameters"]:
                object_field_id = core.get_sequence_id(session=session)
                if param["name"] == row_ownership_param:
                    row_ownership_field_id = object_field_id

                object_field = {
                    "id": object_field_id,
                    "object_id": object_id,
                    "name": param["name"],
                    "position": pos,
                    "db_column": {
                        "name": param["name"],
                        "not_null": not param["optional"],
                        "in": True,
                        "datatype": map_ts_type_to_database_type(param["type"]),
                        "is_array": param["is_array"],
                    },
                    "enabled": True,
                    "allow_filtering": True,
                    "allow_sorting": False,
                    "no_check": False,
                    "no_update": False,
                }
                if param.get("default", None) is not None:
                    object_field["db_column"]["default"] = param["default"]
                # Store interface name
                type_interface = map_ts_type_to_interface(param["type"])
                if type_interface is not None:
                    object_field["db_column"]["interface"] = type_interface

                object_fields.append(object_field)
                pos += 1

            obj = {
                "id": object_id,
                "db_object_id": db_object_id,
                "name": core.convert_path_to_pascal_case(full_path) + "Params",
                "kind": "PARAMETERS",
                "position": 0,
                "fields": object_fields,
            }
            if row_ownership_field_id is not None:
                obj["row_ownership_field_id"] = row_ownership_field_id
            objects.append(obj)

            # Build result object
            object_id = core.get_sequence_id(session=session)
            object_fields = []
            returns_array = False
            return_type = script["return_type"]["type"]
            returns_array = script["return_type"]["is_array"]
            if is_simple_typescript_type(return_type):
                object_field = {
                    "id": core.get_sequence_id(session=session),
                    "object_id": object_id,
                    "name": "result",
                    "position": 0,
                    "db_column": {
                        "name": "result",
                        "not_null": True,
                        "datatype": map_ts_type_to_database_type(return_type),
                        "is_array": returns_array,
                    },
                    "enabled": True,
                    "allow_filtering": True,
                    "allow_sorting": False,
                    "no_check": False,
                    "no_update": False,
                }
                # Store interface name
                type_interface = map_ts_type_to_interface(return_type)
                if type_interface is not None:
                    object_field["db_column"]["interface"] = type_interface

                object_fields.append(object_field)
            else:
                add_object_fields_from_interface(
                    session=session, interface_name=return_type,
                    interface_list=interface_list,
                    object_id=object_id, object_fields=object_fields)

            objects.append({
                "id": object_id,
                "db_object_id": db_object_id,
                "name": core.convert_path_to_pascal_case(full_path) + "Result",
                "kind": "RESULT",
                "position": 1,
                "fields": object_fields,
                "sdk_options": {
                    "language_options": [
                        {
                            "language": "TypeScript",
                            "class_name": return_type,
                        }
                    ],
                    "class_name": return_type,
                    "returns_array": returns_array,
                }
            })

            # Add the grants for the given endpoint to the options
            options = get_mrs_script_property(func_props, "options", None)
            grants = get_mrs_script_property(func_props, "grants", None)
            if grants is not None:
                if options is None:
                    options = {}
                options["grants"] = grants

            db_object_id, grants = db_objects.add_db_object(
                session=session,
                schema_id=schema["id"],
                db_object_id=db_object_id,
                db_object_name=get_mrs_script_property(
                    func_props, "name", script["function_name"]),
                request_path=func_request_path,
                db_object_type="SCRIPT",
                enabled=get_mrs_script_property(func_props, "enabled", True),
                internal=get_mrs_script_property(func_props, "internal", True),
                requires_auth=get_mrs_script_property(
                    func_props, "requiresAuth", True),
                options=options,
                metadata=get_mrs_script_property(func_props, "metadata", None),
                comments=get_mrs_script_property(func_props, "comments", None),
                crud_operation_format=get_mrs_script_property(
                    func_props, "format", "FEED"),
                media_type=get_mrs_script_property(
                    func_props, "mediaType", None),
                items_per_page=None,
                auto_detect_media_type=None,
                auth_stored_procedure=None,
                objects=objects,
            )

            core.insert(table="content_set_has_obj_def", values={
                "content_set_id": content_set_id,
                "db_object_id": db_object_id,
                "kind": "Script",
                "priority": 0,
                "language": language,
                "name": script["function_name"],
                "class_name": script_module["class_name"],
                "options": {
                    "file_to_load": file_to_load
                }
            }).exec(session)

            for grant in grants:
                core.MrsDbExec(grant).exec(session)

        for trigger in script_module["triggers"]:
            print(f"{trigger["function_name"]=}")
        # for interface in script_def["interfaces"]:
        #     print(f"{interface["name"]=}")

    # Add the list of MRS script files to load, as well as the script definition
    options = content_set["options"]
    options["script_module_files"] = script_module_files
    options["script_definitions"] = script_def

    core.update(
        table="content_set",
        sets={"options": options},
        where=["id=?"]
    ).exec(session, [content_set_id])

    # Update content files, make all files private that are not in static folders
    where = []
    params = [content_set_id]
    for folder in static_content_folders:
        if not folder.startswith("/"):
            folder = "/" + folder
        folder += "%"
        where.append(f"request_path LIKE ?")
        params.append(folder)

    core.update(
        table="content_file",
        sets={"enabled": 2},
        where=["content_set_id=?",
               "NOT (" + " OR ".join(where) + ")"]
    ).exec(session, params)

    return script_def