in azure/Kqlmagic/parser.py [0:0]
def _parse_kql_options(cls, code:str, is_cell:bool, config:Configurable, user_ns:Dict[str,Any])->Tuple[str,Dict[str,Any]]:
trimmed_kql = code
trimmed_kql = trimmed_kql.strip()
suppress_results = False
if trimmed_kql.endswith(";"):
suppress_results = not is_cell
if is_cell:
lines = trimmed_kql.splitlines(True)
if lines[-1].strip() == ";":
suppress_results = True
if suppress_results:
trimmed_kql = trimmed_kql[:-1].strip()
words = trimmed_kql.split()
properties = {}
table = options = cls.default_options.copy()
if not words:
return ("", options)
num_words = len(words)
first_word = 0
if num_words - first_word >= 2 and words[first_word + 1] == "<<":
options["result_var"] = words[first_word]
trimmed_kql = trimmed_kql[trimmed_kql.find("<<") + 2:]
first_word += 2
obj = None
key = None
opt_key = None
key_state = True
option_type = None
is_option = True
is_property = False
skip_words_count = 0
for word in words[first_word:]:
if key_state:
if skip_words_count == 0:
_comment, skip_words_count = cls._parse_comment(word, trimmed_kql)
if skip_words_count > 0:
skip_words_count -= 1
trimmed_kql = trimmed_kql[trimmed_kql.find(word) + len(word):]
continue
is_option = word.startswith("-")
is_property = word.startswith("+")
option_type = "option" if is_option else "query property"
if not is_option and not is_property:
break
# validate it is not a command
if is_option and word.startswith("--"):
raise ValueError(f"invalid {option_type} '{word}', cannot start with a bouble hyphen-minus")
trimmed_kql = trimmed_kql[trimmed_kql.find(word) + len(word):]
word = word[1:]
bool_value = True
if word[0].startswith("!"):
bool_value = False
word = word[1:]
if "=" in word:
parts = word.split("=", 1)
key = parts[0]
value = parts[1]
else:
key = word
value = None
if is_option:
lookup_key = key.lower().replace("-", "").replace("_", "")
obj = cls._OPTIONS_TABLE.get(lookup_key)
table = options
else:
lookup_key = key.lower()
obj = cls._QUERY_PROPERTIES_TABLE.get(lookup_key)
table = properties
if obj is not None:
if obj.get("abbreviation") is not None:
obj = cls._OPTIONS_TABLE.get(obj.get("abbreviation"))
if obj.get("flag") in config.read_only_trait_names:
raise ValueError(f"{option_type} {key} is readony, cannot be set")
_type = obj.get("type")
opt_key = obj.get("flag") or lookup_key
if _type == "bool" and value is None:
table[opt_key] = bool_value
else:
if not bool_value:
raise ValueError(f"{option_type} {key} cannot be negated")
if value is not None:
table[opt_key] = cls._parse_value("options" if is_option else "query properties", obj, key, value, user_ns)
else:
key_state = False
else:
raise ValueError(f"unknown {option_type} '{key}'")
else:
trimmed_kql = trimmed_kql[trimmed_kql.find(word) + len(word):]
table[opt_key] = cls._parse_value("options", obj, key, word, user_ns)
key_state = True
first_word += 1
# validate using config traits
if key_state and is_option:
cls._validate_config_trait("options", obj, key, options.get(opt_key), config)
if not key_state:
raise ValueError(f"{option_type} '{opt_key}' must have a value")
if options.get("query_properties"):
properties.update(options["query_properties"])
options["query_properties"] = properties
if suppress_results:
options["suppress_results"] = True
return (trimmed_kql.strip(), options)