def translate_query_to_jsonpath()

in python/shared/research_pacs/shared/dicom_json.py [0:0]


def translate_query_to_jsonpath(query):
  """
  Transform a DICOM query to a PostgreSQL JSONPath query. For example: `Modality Equals CT 
  AND (Manufacturer Equals GE* OR Manufacturer Equals Philips*)` returns `@.00080060 like_regex 
  "CT" flag "i" && (@.00080070 like_regex "GE.*" flag "i" || @.00080070 like_regex "Philips.*" 
  flag "i")`
  
  Args:
    query (str): Input query
    
  """
  logger.debug(f'Translating the query "{query}" to a JSON Path query')
  try:
    
    # If the query is empty, return an empty JSONPath query
    if query.strip() == '':
      return ''
  
    # RegEx pattern of one query condition (e.g. Tag StrEquals Value)
    pattern = (
      '([a-zA-Z0-9\.]+) +' # Tag
      '(?:' # One of the following
      '((?i:(?:Exists|NotExists|Empty|NotEmpty)))|' # Exists|NotExist|Empty|NotEmpty 
      '((?i:(?:NbEquals|NbNotEquals|NbGreater|NbLess))) +([0-9]+(?:\.[0-9]+)?)|' # NbEquals|NbNotEquals|NbGreater|NbLess number
      '((?i:(?:StrEquals|StrNotEquals))) +(?:([^"() ]+)|"([^"]*)")' # StrEquals|StrNotEquals string
      ')'
    )
    
    pg_query = ''
    previous_left = 0
    str_between_conditions = ''
    
    # Parse string between query conditions. Example: In `PatientName StrEquals A OR PatientName 
    # StrEquals B`, ` OR ` is a string between two conditions.
    def parse_between_conditions(string):
      new_string = string.lower().replace(' ', '').replace('and', ' && ').replace('or', ' || ')
      for char in new_string:
        assert char in '()&| '
      return new_string
    
    # For each condition found in the query string
    conditions = re.finditer(pattern, query)
    for condition in conditions:
      cur_left, cur_right = condition.span()
      
      # Parse the string at the left of this condition
      if previous_left < cur_left:
        tmp = parse_between_conditions(query[previous_left:cur_left])
        pg_query += tmp
        str_between_conditions += tmp
      previous_left = cur_right
      
      # Get the tag of the element corresponding to the keyword(s `condition.group(1)` contains the 
      # condition tag. For example, `RequestedProcedureCodeSequence.CodeMeaning` gives
      # `00321064.00080104`
      tag_hexa_split = []
      tag_keywords = condition.group(1)
      for tag_keyword in tag_keywords.split('.'):
        tag_int = pydicom.datadict.tag_for_keyword(tag_keyword)
        if tag_int != None:
          hexa = hex(tag_int)[2:].upper().zfill(8)
          tag_hexa_split.append(hexa)
        else:
          tag_hexa_split.append(tag_keyword)
      tag_hexa = '.'.join(tag_hexa_split)
      
      # If the operator is Exists, NotExists, Empty or NotEmpty
      # `condition.group(2)` contains the operator
      operator = condition.group(2)
      if operator != None:
        if operator.lower() == 'exists':
          pg_query += f'exists(@.{tag_hexa})'
        elif operator.lower() == 'notexists':
          pg_query += f'!(exists(@.{tag_hexa}))'
        elif operator.lower() == 'empty':
          pg_query += f'@.{tag_hexa} == ""'
        elif operator.lower() == 'notempty':
          pg_query += f'!(@.{tag_hexa} == "")'
      
      # If the operator is NbEquals, NbNotEquals, Greater or Less
      # `condition.group(3)` contains the operator that manipulates numbers
      # `condition.group(4)` contains the value
      operator = condition.group(3)
      if operator != None:
        nb_value = condition.group(4)
        if operator.lower() == 'nbequals':
          pg_query += f'@.{tag_hexa}.double() == {nb_value}'
        elif operator.lower() == 'nbnotequals':
          pg_query += f'!(@.{tag_hexa}.double() == {nb_value})'
        elif operator.lower() == 'nbgreater':
          pg_query += f'@.{tag_hexa}.double() > {nb_value}'
        elif operator.lower() == 'nbless':
          pg_query += f'@.{tag_hexa}.double() < {nb_value}'
      
      # If the operator is StrEquals or StrNotEquals
      # `condition.group(5)` contains the operator that manipulates strings
      # `condition.group(6)` or `condition.group(7)` contains the value
      operator = condition.group(5)
      if operator != None:
        str_value = condition.group(7) if condition.group(6) is None else condition.group(6)
        # The value must be contain ' or " characters
        assert not ("'" in str_value or '"' in str_value), "The condition value cannot contain \" or '"
        escape_str_value = re.escape(str_value).replace('\*', '.*')
        reg_value = f'^{escape_str_value}$'
        if operator.lower() == 'strequals':
          pg_query += f'@.{tag_hexa} like_regex "{reg_value}" flag "i"'
        elif operator.lower() == 'strnotequals':
          pg_query += f'!(@.{tag_hexa} like_regex "{reg_value}" flag "i"'
  
    # Parse the string at the right of the last condtion
    if previous_left < len(query):
      tmp = parse_between_conditions(query[previous_left:])
      pg_query += tmp
      str_between_conditions += tmp

    # Check if there is the small number of opening and closing parathesis and if there is no 
    # empty parenthesis block
    assert str_between_conditions.count('(') == str_between_conditions.count(')')
    assert str_between_conditions.count('()') == 0
    
    return pg_query

  except Exception as e:
    err_msg = f'Failed to translate the query "{query}" to a JSONPath query - {e}'
    logger.debug(err_msg)
    raise ValueError(err_msg)