def _parse_kql_command()

in azure/Kqlmagic/parser.py [0:0]


    def _parse_kql_command(cls, section:Dict[str,str], user_ns:Dict[str,Any])->Tuple[str,Dict[str,Any]]:
        code = section.get("body")
        if section.get("name") != Constants.MAGIC_NAME:
            name = section.get("name").replace("_", "").replace("-", "")
            if name in ["py", "pyro", "pyrw"]:
                obj = cls._COMMANDS_TABLE.get(name)
                return ("", {"command": obj.get("flag"), "obj": obj, "params": [section.get("body"), name]})
            else:
                # line/cell magic
                name = section.get("type")
                obj = cls._COMMANDS_TABLE.get(name.replace("_", ""))
                command_name = obj.get("flag")

                params = []
                body = section.get("body")
                if command_name == "cell_magic":
                    first_line = body.split("\n",1)[0]
                    params.append(first_line.strip())
                    body = body[len(first_line) + 1:]

                params.append(body)
                params.append(section.get("name"))

                return ("", {"command": command_name, "obj": obj, "params": params})

        # kql section
        lookup_key = None
        trimmed_code = code
        skip_words_count = 0
        obj = None
        params_type = None
        command_name = None
        params = []
        words = code.split()
        more_words_count = len(words)
        for word in words:
            more_words_count -= 1

            if params_type == "not_quoted_str" and not word.startswith("-"):
                # break
                pass

            if skip_words_count == 0:
                _comment, skip_words_count = cls._parse_comment(word, trimmed_code)

            if skip_words_count > 0:
                skip_words_count -= 1
                trimmed_code = trimmed_code[trimmed_code.find(word) + len(word):]
                continue

            # command 
            elif command_name is None:
                if not word.strip().startswith("--"):
                    break
                word = word[2:]
                if word.startswith("-"):
                    raise ValueError(f"unknown command --{word}, commands' prefix should be a double hyphen-minus, not a triple hyphen-minus")
                lookup_key = word.lower().replace("_", "").replace("-", "")
                obj = cls._COMMANDS_TABLE.get(lookup_key)
                if obj is None:
                    raise ValueError(f"unknown command --{word}")
                command_name = obj.get("flag")
                trimmed_code = trimmed_code[trimmed_code.find(word) + len(word):]

                params_type = obj.get("type")

                if params_type is None:
                    break
            # option
            elif word.startswith("-"):
                break

            # command's parameters
            else:
                command = {"command": command_name, "obj": obj, "params": params}
                EXIT_ON_OPTION = True
                trimmed_code, params = cls._parse_kql_command_params(command, trimmed_code, EXIT_ON_OPTION, user_ns)
                break

        if command_name is None:
            return (code.strip(), {})

        if command_name == "python":
            params = params or [""]
            params.append(lookup_key) # type of python "py", "pyro", "pyrw"

        elif command_name in ["line_magic", "cell_magic"]:
            body = params[0] if params else ""
            magic_word = body.split(None, 1)[0]
            body = body.lstrip()[len(magic_word):]

            params = []
            if command_name == "cell_magic":
                first_line = body.split("\n", 1)[0]
                params.append(first_line.strip())
                body = body[len(first_line) + 1:]
            else:
                body = body.lstrip()
            params.append(body)

            ipykernel_prefix_len = 0
            if command_name == "cell_magic" and magic_word.startswith(Constants.IPYKERNEL_CELL_MAGIC_PREFIX):
                ipykernel_prefix_len = len(Constants.IPYKERNEL_CELL_MAGIC_PREFIX)
            elif command_name == "line_magic" and magic_word.startswith(Constants.IPYKERNEL_LINE_MAGIC_PREFIX):
                ipykernel_prefix_len = len(Constants.IPYKERNEL_LINE_MAGIC_PREFIX)
            magic_name = magic_word[ipykernel_prefix_len:]
            params.append(magic_name) # the magic name

            if magic_name in ["py", "pyro", "pyrw"]:
                obj = cls._COMMANDS_TABLE.get(magic_name)
                body = params[0] if command_name == "line_magic" else f"{params[0]}\n{params[1]}"
                return ("", {"command": obj.get("flag"), "obj": obj, "params": [body, magic_name]})

        return (trimmed_code.strip(), {"command": command_name, "obj": obj, "params": params})