def apply_transformations()

in python/de-identifier/research_pacs/de_identifier/dicom.py [0:0]


  def apply_transformations(self, logs):
    """
    Apply the transformations in `self._transformations` to the DICOM file. The transformations 
    are applied in the following order:
    - ShiftDateTime
    - RandomizeText
    - RandomizeUID
    - AddTags
    - RemoveBurnedInAnnotations
    - DeleteTags
    - Transcode. This does not alter the DICOM file but returns a transfer syntax UID to which 
      the de-identified DICOM file will be transcoded with Orthanc.
    
    Args:
      logs (dict): Dict where logs should be added
    
    """
    
    def _log_t(transformation, value):
      """
      Add a transformation applied to the log dict.
      
      """
      logs.setdefault('TransformationsApplied', {})
      logs['TransformationsApplied'].setdefault(transformation, [])
      logs['TransformationsApplied'][transformation].append(value)
      
    def _process_each_elem_item(f, elem, *args):
      """
      If the element contains multiple items, process each item with the function `f`. Otherwise,
      process its single item value with `f`. `f` returns the new value of the element item.
      
      """
      if isinstance(elem.value, pydicom.multival.MultiValue):
        for i in range(len(elem.value)):
          elem.value[i] = f(elem, elem.value[i], *args)
      else:
        elem.value = f(elem, elem.value, *args)
    
    def _get_new_value_from_mapping(t, value_type, old_value, new_value):
      """
      If `ReuseMapping` is specified in `t`, check if a mapping already exists in the database, 
      and return the existing value in that case. Otherwise, create a new mapping in the database 
      between `old_value` and `new_value` if `ReuseMapping` is specified, and return the new 
      value.
      
      Args:
        t: dict that may contain a `ReuseMapping` attribute
        value_type (str): The type of data (`TEXT` or `DATETIME`)
        old_value (str): The original value of the DICOM data element
        new_value (str): The value of the DICOM data element after de-identification
        
      """
      if 'ReuseMapping' in t:
        if t['ReuseMapping'] == 'Always':
          scope_type = 'always'
          scope_value = 'always'
        elif t['ReuseMapping'] == 'SamePatient':
          scope_type = 'patient'
          scope_value = _old_patient_id if _old_patient_id != None else self.dicom.PatientID
        elif t['ReuseMapping'] == 'SameStudy':
          scope_type = 'study'
          scope_value = self.dicom.StudyInstanceUID
        elif t['ReuseMapping'] == 'SameSeries':
          scope_type = 'series'
          scope_value = self.dicom.SeriesInstanceUID
        else:
          scope_type = 'study'
          scope_value = self.dicom.SOPInstanceUID
        if scope_value == '':
          raise Exception('The scope value for ReuseMapping must not be empty')
        return self._db_mapping.add_or_get_mapping(value_type, old_value, new_value, scope_type, scope_value)
      else:
        return new_value
      
    dst_transfer_syntax = self._src_transfer_syntax
    _last_action = ''  # This is used for debugging if an exception is raised
    _old_patient_id = None  # Keep track of the initial PatientID value if it is changed
    try:
      
      ### ShiftDateTime
      if 'ShiftDateTime' in self._transformations:
        _last_action = f'ShiftDateTime'
        
        def shift_date_time(elem, item_value, elem_full_tag, t):
          """
          Converts the string value to a datetime object and shift by `ShiftBy` days if it is a 
          DA, or `ShiftBy` seconds if it a DT or TM.
          
          Args:
            elem: pydicom DataElement
            elem_value (str): Value of the element item to process
            elem_full_tag (str): Full path to the element
            t: dict for the current transformation
            
          """
          old_value = str(item_value)
          shift_value = random.randint(-t['ShiftBy'], +t['ShiftBy'])
          
          # If VR is DA, shift the date by `shift_value` days
          if elem.VR == 'DA':
            old_date = datetime.datetime.strptime(old_value, '%Y%m%d')
            new_date = old_date + datetime.timedelta(days=shift_value)
            new_value = new_date.strftime('%Y%m%d')
            
          # If VR is TM, shift the date by `shift_value` seconds
          elif elem.VR == 'TM':
            old_date = datetime.datetime.strptime(old_value[:6], '%H%M%S')
            new_date = old_date + datetime.timedelta(seconds=shift_value)
            new_value = new_date.strftime('%H%M%S')
            
          # If VR is DT, shift the date by `shift_valxue` seconds
          else:
            old_date = datetime.datetime.strptime(old_value[:14], '%Y%m%d%H%M%S')
            new_date = old_date + datetime.timedelta(seconds=shift_value)
            new_value = new_date.strftime('%Y%m%d%H%M%S')
          
          final_value = _get_new_value_from_mapping(t, 'DATETIME', old_value, new_value)
          _log_t('ShiftDateTime', f"Tag={elem_full_tag} OldValue={old_value} NewValue={final_value}")
          return final_value
          
        for t in self._transformations['ShiftDateTime']:
          for elem, elem_full_tag, parent_elem in dicom_tpp.enumerate_elements_match_tag_path_patterns(self.dicom, t['TagPatterns'], t['ExceptTagPatterns']):
            _last_action = f'ShiftDateTime Tag={elem_full_tag}'
            if elem.VR in ('DA', 'DT', 'TM') and not elem.is_empty:
              _process_each_elem_item(shift_date_time, elem, elem_full_tag, t)
      
      ### RandomizeText
      if 'RandomizeText' in self._transformations:
        _last_action = f'RandomizeText'
        
        def randomize_text(elem, item_value, elem_full_tag, t):
          """
          Split the original item value if specified by `Split`, replace each part by a random 
          8-character string, and rejoin the parts if needed.
          
          Args:
            elem: pydicom DataElement
            elem_i (int): If elem contains multiple values, `elem_i` is the index of the value to 
              process. If there is a single value, `elem_i ` equals `None`
            elem_full_tag (str): Full path to the element
            t: dict for the current transformation
            
          """
          old_value_before_split = str(item_value)
          old_value_after_split = old_value_before_split.split(t['Split']) if t['Split'] is True else [str(old_value_before_split)]
          new_value_before_join = []
          
          for old_value in old_value_after_split:
            if old_value == '':
              new_value = ''
            else:
              old_value = old_value.lower() if t['IgnoreCase'] is True else old_value
              random_value = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(8))
              new_value = _get_new_value_from_mapping(t, 'DATETIME', old_value, random_value)
            new_value_before_join.append(new_value)
          
          final_value = t['Split'].join(new_value_before_join) if t['Split'] is True else new_value_before_join[0]
          _log_t('RandomizeText', f"Tag={elem_full_tag} OldValue={old_value_before_split} NewValue={final_value}")
          return final_value
        
        for t in self._transformations['RandomizeText']:
          for elem, elem_full_tag, parent_elem in dicom_tpp.enumerate_elements_match_tag_path_patterns(self.dicom, t['TagPatterns'], t['ExceptTagPatterns']):
            _last_action = f'RandomizeText Tag={elem_full_tag}'
            if not elem.is_empty:
              if elem_full_tag == '00100020':
                _old_patient_id = elem.value
              _process_each_elem_item(randomize_text, elem, elem_full_tag, t)
      
      ### RandomizeUID
      if 'RandomizeUID' in self._transformations:
        _last_action = f'RandomizeUID'
        
        def randomize_uid(elem, item_value, elem_full_tag, t):
          """
          Replaces the old UID by a new UID. If the old UID already exists in the mapping 
          table of the database, it is always replaced by the same UID.
          
          Args:
            elem: pydicom DataElement
            elem_i (int): If elem contains multiple values, `elem_i` is the index of the value to 
              process. If there is a single value, `elem_i ` equals `None`
            elem_full_tag (str): Full path to the element
            t: dict for the current transformation
            
          """
          old_uid = str(item_value)
          random_uid = pydicom.uid.generate_uid(prefix=t['Prefix']) if 'Prefix' in t else pydicom.uid.generate_uid()
          new_uid = self._db_mapping.add_or_get_mapping('UID', old_uid, random_uid, 'always', 'always')
          # Update the tag value, and the meta header tag MediaStorageSOPInstanceUID if the 
          # current element is SOPInstanceUID
          if elem_full_tag == '00080018':
            self.dicom.file_meta.MediaStorageSOPInstanceUID = new_uid
          _log_t('RandomizeUID', f"Tag={elem_full_tag} OldValue={old_uid} NewValue={new_uid}")
          return new_uid
        
        for t in self._transformations['RandomizeUID']:
          for elem, elem_full_tag, parent_elem in dicom_tpp.enumerate_elements_match_tag_path_patterns(self.dicom, t['TagPatterns'], t['ExceptTagPatterns']):
            _last_action = f'RandomizeUID Tag={elem_full_tag}'
            # Ignore the element if its VR is not UI
            if elem.VR == 'UI' and not elem.is_empty:
              _process_each_elem_item(randomize_uid, elem, elem_full_tag, t)
      
      ### AddTags
      if 'AddTags' in self._transformations:
        _last_action = f'AddTags'
        for t in self._transformations['AddTags']:
          _last_action = f"AddTags Tag={t['Tag']}"
          for parent_elem, tag_int in dicom_tp.enumerate_parent_elements(self.dicom, t['Tag']):
            if tag_int in parent_elem and t['OverwriteIfExists'] is False:
              continue
            new_elem = pydicom.dataelem.DataElement(tag_int, t['VR'], t['Value'])
            parent_elem.add(new_elem)
          _log_t('AddTags', f"Tag={t['Tag']}")

      ## RemoveBurnedInAnnotations
      if 'RemoveBurnedInAnnotations' in self._transformations:
        _last_action = f'RemoveBurnedInAnnotations'
        pixels = self.dicom.pixel_array
        width, height = rpacs_dicom_util.get_dimensions(self.dicom)
        samples_per_pixel = rpacs_dicom_util.get_samples_per_pixel(self.dicom)
        _last_action = f'RemoveBurnedInAnnotations Step=CreateMask PixelArrayShape={pixels.shape} Width={width} Height={height} SamplesPerPixel={samples_per_pixel}'
        
        # Generate a mask that will be used to replace boxes to mask with black pixels. The mask 
        # contains only "1" values first, and will be set to "0" later for pixels to obscur
        if pixels.ndim == 4:
          # (frames, Y, X, channel)
          mask = np.ones((1, height, width, 1), dtype=np.uint8)
        elif pixels.ndim == 3 and pixels.shape[2] == samples_per_pixel:
          # (Y, X, channel)
          mask = np.ones((height, width, 1), dtype=np.uint8)
        elif pixels.ndim == 3:
          # (frames, Y, X)
          mask = np.ones((1, height, width), dtype=np.uint8)
        else:
          # (Y, X)
          mask = np.ones((height, width), dtype=np.uint8)
        
        for t in self._transformations['RemoveBurnedInAnnotations']:
          if 'BoxCoordinates' in t:
            for box in t['BoxCoordinates']:
              box_left, box_top, box_right, box_bottom = box
              _last_action = f'RemoveBurnedInAnnotations Step=EditMask PixelArrayShape={pixels.shape} MaskShape={mask.shape} Box=({box_left}, {box_top}, {box_right}, {box_bottom})'
              box_left = max(0, min(width-1, box_left))
              box_right = max(0, min(width-1, box_right))
              box_top = max(0, min(height-1, box_top))
              box_bottom = max(0, min(height-1, box_bottom))
              # Put zeros in the mask where pixels must be obscured
              if pixels.ndim == 4:
                mask[0, box_top:box_bottom, box_left:box_right, 0] = 0
              elif pixels.ndim == 3 and pixels.shape[2] == samples_per_pixel:
                mask[box_top:box_bottom, box_left:box_right, 0] = 0
              elif pixels.ndim == 3:
                mask[0, box_top:box_bottom, box_left:box_right] = 0
              else:
                mask[box_top:box_bottom, box_left:box_right] = 0
              _log_t('RemoveBurnedInAnnotations', f"Type={t['Type']} Box=({box_left}, {box_top}, {box_right}, {box_bottom})")
        
        # Apply the mask and updated the DICOM image tags accordingly
        _last_action = f'RemoveBurnedInAnnotations Step=ApplyMask PixelArrayShape={pixels.shape} MaskShape={mask.shape}'
        new_pixels = mask * pixels
        self.dicom.PixelData = new_pixels.tobytes()
        self.dicom.BitsAllocated = pixels.itemsize*8
        self.dicom.BitsStored = pixels.itemsize*8
        self.dicom.HighBit = pixels.itemsize*8-1
        if samples_per_pixel > 1:
          self.dicom.PlanarConfiguration = 0
      
      ### DeleteTags
      if 'DeleteTags' in self._transformations:
        _last_action = 'DeleteTags'
        for t in self._transformations['DeleteTags']:
          for elem, elem_full_tag, parent_elem in dicom_tpp.enumerate_elements_match_tag_path_patterns(self.dicom, t['TagPatterns'], t['ExceptTagPatterns']):
            _last_action = f'DeleteTags Tag={elem_full_tag}'
            if t['Action'] == 'Remove':
              del parent_elem[elem.tag]
            else:
              elem.clear()
            _log_t('DeleteTags', f"Tag={elem_full_tag} Action={t['Action']}")
      
      ### Transcode
      if 'Transcode' in self._transformations:
        dst_transfer_syntax = self._transformations['Transcode']
        _log_t('Transcode', f"{dst_transfer_syntax}")
  
    except Exception as e:
      raise Exception(f'Last action attempted: {_last_action} - {e}')
  
    # Check if the current DICOM file in `self.dicom` should be transcoded to a new transfer 
    # syntax and return the new transfer syntax and the list of changes applied
    return None if self.dicom.file_meta.TransferSyntaxUID == dst_transfer_syntax else dst_transfer_syntax