in azure/Kqlmagic/parser.py [0:0]
def parse(cls, _line:str, _cell:str, config:Configurable, engines:List[Engine], user_ns:Dict[str,Any])->List[Dict[str,Any]]:
is_cell = _cell is not None
cell = f"{_line}\n{_cell or ''}"
cell = cell.strip()
code = cell
#
# split string to queries
#
suppress_all_results = False
# tuple:
sections:List[Dict[str,str]] = []
if is_cell:
magic_section_name = Constants.MAGIC_NAME
section_lines:List[str] = []
previous_line = " " # should be init to space for the below to work
# note: splitlines don't remove the \n suffix, each line endswith \n
for line in code.splitlines(True):
lstripped_line = line.lstrip()
if (lstripped_line.startswith(Constants.IPYKERNEL_CELL_MAGIC_PREFIX)
and (previous_line.isspace() or (len(sections) > 0 and sections[-1].get("type") == "line_magic"))):
if len(section_lines) > 0:
sections.append({"type": "cell_magic", "name": magic_section_name, "body": "".join(section_lines)})
magic_word = lstripped_line.split(None, 1)[0]
magic_section_name = magic_word[len(Constants.IPYKERNEL_CELL_MAGIC_PREFIX):]
lstripped_line = lstripped_line[len(magic_word):].lstrip()
if magic_section_name == Constants.MAGIC_NAME:
section_lines = [] if lstripped_line.isspace() else [lstripped_line]
else:
section_lines = [lstripped_line]
elif magic_section_name != Constants.MAGIC_NAME:
section_lines.append(line)
elif (lstripped_line.startswith(Constants.IPYKERNEL_LINE_MAGIC_PREFIX)
and (previous_line.isspace() or (len(sections) > 0 and sections[-1].get("type") == "line_magic"))):
magic_word = lstripped_line.split(None, 1)[0]
magic_name = magic_word[len(Constants.IPYKERNEL_LINE_MAGIC_PREFIX):]
lstripped_line = lstripped_line[len(magic_word):].lstrip()
if magic_name == Constants.MAGIC_NAME:
if not lstripped_line.isspace():
sections.append({"type": "line_magic", "name": magic_name, "body": lstripped_line})
else:
sections.append({"type": "line_magic", "name": magic_name, "body": lstripped_line})
elif line.isspace():
if len(section_lines) > 0:
not_commented_lines = [1 for seg_line in section_lines if not seg_line.lstrip().startswith("//")]
if (len(not_commented_lines) > 0):
sections.append({"type": "cell_magic", "name": magic_section_name, "body": "".join(section_lines)})
section_lines = []
else:
section_lines.append(line)
previous_line = line
if len(section_lines) > 0:
if magic_section_name == Constants.MAGIC_NAME:
not_commented_lines = [1 for seg_line in section_lines if not seg_line.lstrip().startswith("//")]
if (len(not_commented_lines) > 0):
sections.append({"type": "cell_magic", "name": magic_section_name, "body": "".join(section_lines)})
else:
sections.append({"type": "cell_magic", "name": magic_section_name, "body": "".join(section_lines)})
if len(sections) > 0:
last_query = sections[-1].get("body").strip()
if last_query == ";":
suppress_all_results = True
sections = sections[:-1]
if len(sections) == 0:
sections.append({"type": "cell_magic", "name": Constants.MAGIC_NAME, "body": ""})
else:
sections.append({"type": "line_magic", "name": Constants.MAGIC_NAME, "body": code.strip()})
#
# parse code to kql and options
#
parsed_sections = []
last_connection_string = ""
for section in sections:
parsed_section = cls._parse_one_section(section, is_cell, config, engines, user_ns)
connection_string = parsed_section.get("connection_string")
if connection_string:
last_connection_string = connection_string
elif len(parsed_section.get("command")) == 0:
parsed_section["connection_string"] = last_connection_string
if suppress_all_results:
parsed_section.get("options")["suppress_results"] = True
parsed_sections.append(parsed_section)
if len(parsed_sections) > 0:
parsed_sections[-1]["last_query"] = True
return parsed_sections