in azure/Kqlmagic/parser.py [0:0]
def _parse_kql_command(cls, section:Dict[str,str], user_ns:Dict[str,Any])->Tuple[str,Dict[str,Any]]:
code = section.get("body")
if section.get("name") != Constants.MAGIC_NAME:
name = section.get("name").replace("_", "").replace("-", "")
if name in ["py", "pyro", "pyrw"]:
obj = cls._COMMANDS_TABLE.get(name)
return ("", {"command": obj.get("flag"), "obj": obj, "params": [section.get("body"), name]})
else:
# line/cell magic
name = section.get("type")
obj = cls._COMMANDS_TABLE.get(name.replace("_", ""))
command_name = obj.get("flag")
params = []
body = section.get("body")
if command_name == "cell_magic":
first_line = body.split("\n",1)[0]
params.append(first_line.strip())
body = body[len(first_line) + 1:]
params.append(body)
params.append(section.get("name"))
return ("", {"command": command_name, "obj": obj, "params": params})
# kql section
lookup_key = None
trimmed_code = code
skip_words_count = 0
obj = None
params_type = None
command_name = None
params = []
words = code.split()
more_words_count = len(words)
for word in words:
more_words_count -= 1
if params_type == "not_quoted_str" and not word.startswith("-"):
# break
pass
if skip_words_count == 0:
_comment, skip_words_count = cls._parse_comment(word, trimmed_code)
if skip_words_count > 0:
skip_words_count -= 1
trimmed_code = trimmed_code[trimmed_code.find(word) + len(word):]
continue
# command
elif command_name is None:
if not word.strip().startswith("--"):
break
word = word[2:]
if word.startswith("-"):
raise ValueError(f"unknown command --{word}, commands' prefix should be a double hyphen-minus, not a triple hyphen-minus")
lookup_key = word.lower().replace("_", "").replace("-", "")
obj = cls._COMMANDS_TABLE.get(lookup_key)
if obj is None:
raise ValueError(f"unknown command --{word}")
command_name = obj.get("flag")
trimmed_code = trimmed_code[trimmed_code.find(word) + len(word):]
params_type = obj.get("type")
if params_type is None:
break
# option
elif word.startswith("-"):
break
# command's parameters
else:
command = {"command": command_name, "obj": obj, "params": params}
EXIT_ON_OPTION = True
trimmed_code, params = cls._parse_kql_command_params(command, trimmed_code, EXIT_ON_OPTION, user_ns)
break
if command_name is None:
return (code.strip(), {})
if command_name == "python":
params = params or [""]
params.append(lookup_key) # type of python "py", "pyro", "pyrw"
elif command_name in ["line_magic", "cell_magic"]:
body = params[0] if params else ""
magic_word = body.split(None, 1)[0]
body = body.lstrip()[len(magic_word):]
params = []
if command_name == "cell_magic":
first_line = body.split("\n", 1)[0]
params.append(first_line.strip())
body = body[len(first_line) + 1:]
else:
body = body.lstrip()
params.append(body)
ipykernel_prefix_len = 0
if command_name == "cell_magic" and magic_word.startswith(Constants.IPYKERNEL_CELL_MAGIC_PREFIX):
ipykernel_prefix_len = len(Constants.IPYKERNEL_CELL_MAGIC_PREFIX)
elif command_name == "line_magic" and magic_word.startswith(Constants.IPYKERNEL_LINE_MAGIC_PREFIX):
ipykernel_prefix_len = len(Constants.IPYKERNEL_LINE_MAGIC_PREFIX)
magic_name = magic_word[ipykernel_prefix_len:]
params.append(magic_name) # the magic name
if magic_name in ["py", "pyro", "pyrw"]:
obj = cls._COMMANDS_TABLE.get(magic_name)
body = params[0] if command_name == "line_magic" else f"{params[0]}\n{params[1]}"
return ("", {"command": obj.get("flag"), "obj": obj, "params": [body, magic_name]})
return (trimmed_code.strip(), {"command": command_name, "obj": obj, "params": params})