in python/shared/research_pacs/shared/dicom_json.py [0:0]
def translate_query_to_jsonpath(query):
"""
Transform a DICOM query to a PostgreSQL JSONPath query. For example: `Modality Equals CT
AND (Manufacturer Equals GE* OR Manufacturer Equals Philips*)` returns `@.00080060 like_regex
"CT" flag "i" && (@.00080070 like_regex "GE.*" flag "i" || @.00080070 like_regex "Philips.*"
flag "i")`
Args:
query (str): Input query
"""
logger.debug(f'Translating the query "{query}" to a JSON Path query')
try:
# If the query is empty, return an empty JSONPath query
if query.strip() == '':
return ''
# RegEx pattern of one query condition (e.g. Tag StrEquals Value)
pattern = (
'([a-zA-Z0-9\.]+) +' # Tag
'(?:' # One of the following
'((?i:(?:Exists|NotExists|Empty|NotEmpty)))|' # Exists|NotExist|Empty|NotEmpty
'((?i:(?:NbEquals|NbNotEquals|NbGreater|NbLess))) +([0-9]+(?:\.[0-9]+)?)|' # NbEquals|NbNotEquals|NbGreater|NbLess number
'((?i:(?:StrEquals|StrNotEquals))) +(?:([^"() ]+)|"([^"]*)")' # StrEquals|StrNotEquals string
')'
)
pg_query = ''
previous_left = 0
str_between_conditions = ''
# Parse string between query conditions. Example: In `PatientName StrEquals A OR PatientName
# StrEquals B`, ` OR ` is a string between two conditions.
def parse_between_conditions(string):
new_string = string.lower().replace(' ', '').replace('and', ' && ').replace('or', ' || ')
for char in new_string:
assert char in '()&| '
return new_string
# For each condition found in the query string
conditions = re.finditer(pattern, query)
for condition in conditions:
cur_left, cur_right = condition.span()
# Parse the string at the left of this condition
if previous_left < cur_left:
tmp = parse_between_conditions(query[previous_left:cur_left])
pg_query += tmp
str_between_conditions += tmp
previous_left = cur_right
# Get the tag of the element corresponding to the keyword(s `condition.group(1)` contains the
# condition tag. For example, `RequestedProcedureCodeSequence.CodeMeaning` gives
# `00321064.00080104`
tag_hexa_split = []
tag_keywords = condition.group(1)
for tag_keyword in tag_keywords.split('.'):
tag_int = pydicom.datadict.tag_for_keyword(tag_keyword)
if tag_int != None:
hexa = hex(tag_int)[2:].upper().zfill(8)
tag_hexa_split.append(hexa)
else:
tag_hexa_split.append(tag_keyword)
tag_hexa = '.'.join(tag_hexa_split)
# If the operator is Exists, NotExists, Empty or NotEmpty
# `condition.group(2)` contains the operator
operator = condition.group(2)
if operator != None:
if operator.lower() == 'exists':
pg_query += f'exists(@.{tag_hexa})'
elif operator.lower() == 'notexists':
pg_query += f'!(exists(@.{tag_hexa}))'
elif operator.lower() == 'empty':
pg_query += f'@.{tag_hexa} == ""'
elif operator.lower() == 'notempty':
pg_query += f'!(@.{tag_hexa} == "")'
# If the operator is NbEquals, NbNotEquals, Greater or Less
# `condition.group(3)` contains the operator that manipulates numbers
# `condition.group(4)` contains the value
operator = condition.group(3)
if operator != None:
nb_value = condition.group(4)
if operator.lower() == 'nbequals':
pg_query += f'@.{tag_hexa}.double() == {nb_value}'
elif operator.lower() == 'nbnotequals':
pg_query += f'!(@.{tag_hexa}.double() == {nb_value})'
elif operator.lower() == 'nbgreater':
pg_query += f'@.{tag_hexa}.double() > {nb_value}'
elif operator.lower() == 'nbless':
pg_query += f'@.{tag_hexa}.double() < {nb_value}'
# If the operator is StrEquals or StrNotEquals
# `condition.group(5)` contains the operator that manipulates strings
# `condition.group(6)` or `condition.group(7)` contains the value
operator = condition.group(5)
if operator != None:
str_value = condition.group(7) if condition.group(6) is None else condition.group(6)
# The value must be contain ' or " characters
assert not ("'" in str_value or '"' in str_value), "The condition value cannot contain \" or '"
escape_str_value = re.escape(str_value).replace('\*', '.*')
reg_value = f'^{escape_str_value}$'
if operator.lower() == 'strequals':
pg_query += f'@.{tag_hexa} like_regex "{reg_value}" flag "i"'
elif operator.lower() == 'strnotequals':
pg_query += f'!(@.{tag_hexa} like_regex "{reg_value}" flag "i"'
# Parse the string at the right of the last condtion
if previous_left < len(query):
tmp = parse_between_conditions(query[previous_left:])
pg_query += tmp
str_between_conditions += tmp
# Check if there is the small number of opening and closing parathesis and if there is no
# empty parenthesis block
assert str_between_conditions.count('(') == str_between_conditions.count(')')
assert str_between_conditions.count('()') == 0
return pg_query
except Exception as e:
err_msg = f'Failed to translate the query "{query}" to a JSONPath query - {e}'
logger.debug(err_msg)
raise ValueError(err_msg)