in python/de-identifier/research_pacs/de_identifier/dicom.py [0:0]
def apply_transformations(self, logs):
"""
Apply the transformations in `self._transformations` to the DICOM file. The transformations
are applied in the following order:
- ShiftDateTime
- RandomizeText
- RandomizeUID
- AddTags
- RemoveBurnedInAnnotations
- DeleteTags
- Transcode. This does not alter the DICOM file but returns a transfer syntax UID to which
the de-identified DICOM file will be transcoded with Orthanc.
Args:
logs (dict): Dict where logs should be added
"""
def _log_t(transformation, value):
"""
Add a transformation applied to the log dict.
"""
logs.setdefault('TransformationsApplied', {})
logs['TransformationsApplied'].setdefault(transformation, [])
logs['TransformationsApplied'][transformation].append(value)
def _process_each_elem_item(f, elem, *args):
"""
If the element contains multiple items, process each item with the function `f`. Otherwise,
process its single item value with `f`. `f` returns the new value of the element item.
"""
if isinstance(elem.value, pydicom.multival.MultiValue):
for i in range(len(elem.value)):
elem.value[i] = f(elem, elem.value[i], *args)
else:
elem.value = f(elem, elem.value, *args)
def _get_new_value_from_mapping(t, value_type, old_value, new_value):
"""
If `ReuseMapping` is specified in `t`, check if a mapping already exists in the database,
and return the existing value in that case. Otherwise, create a new mapping in the database
between `old_value` and `new_value` if `ReuseMapping` is specified, and return the new
value.
Args:
t: dict that may contain a `ReuseMapping` attribute
value_type (str): The type of data (`TEXT` or `DATETIME`)
old_value (str): The original value of the DICOM data element
new_value (str): The value of the DICOM data element after de-identification
"""
if 'ReuseMapping' in t:
if t['ReuseMapping'] == 'Always':
scope_type = 'always'
scope_value = 'always'
elif t['ReuseMapping'] == 'SamePatient':
scope_type = 'patient'
scope_value = _old_patient_id if _old_patient_id != None else self.dicom.PatientID
elif t['ReuseMapping'] == 'SameStudy':
scope_type = 'study'
scope_value = self.dicom.StudyInstanceUID
elif t['ReuseMapping'] == 'SameSeries':
scope_type = 'series'
scope_value = self.dicom.SeriesInstanceUID
else:
scope_type = 'study'
scope_value = self.dicom.SOPInstanceUID
if scope_value == '':
raise Exception('The scope value for ReuseMapping must not be empty')
return self._db_mapping.add_or_get_mapping(value_type, old_value, new_value, scope_type, scope_value)
else:
return new_value
dst_transfer_syntax = self._src_transfer_syntax
_last_action = '' # This is used for debugging if an exception is raised
_old_patient_id = None # Keep track of the initial PatientID value if it is changed
try:
### ShiftDateTime
if 'ShiftDateTime' in self._transformations:
_last_action = f'ShiftDateTime'
def shift_date_time(elem, item_value, elem_full_tag, t):
"""
Converts the string value to a datetime object and shift by `ShiftBy` days if it is a
DA, or `ShiftBy` seconds if it a DT or TM.
Args:
elem: pydicom DataElement
elem_value (str): Value of the element item to process
elem_full_tag (str): Full path to the element
t: dict for the current transformation
"""
old_value = str(item_value)
shift_value = random.randint(-t['ShiftBy'], +t['ShiftBy'])
# If VR is DA, shift the date by `shift_value` days
if elem.VR == 'DA':
old_date = datetime.datetime.strptime(old_value, '%Y%m%d')
new_date = old_date + datetime.timedelta(days=shift_value)
new_value = new_date.strftime('%Y%m%d')
# If VR is TM, shift the date by `shift_value` seconds
elif elem.VR == 'TM':
old_date = datetime.datetime.strptime(old_value[:6], '%H%M%S')
new_date = old_date + datetime.timedelta(seconds=shift_value)
new_value = new_date.strftime('%H%M%S')
# If VR is DT, shift the date by `shift_valxue` seconds
else:
old_date = datetime.datetime.strptime(old_value[:14], '%Y%m%d%H%M%S')
new_date = old_date + datetime.timedelta(seconds=shift_value)
new_value = new_date.strftime('%Y%m%d%H%M%S')
final_value = _get_new_value_from_mapping(t, 'DATETIME', old_value, new_value)
_log_t('ShiftDateTime', f"Tag={elem_full_tag} OldValue={old_value} NewValue={final_value}")
return final_value
for t in self._transformations['ShiftDateTime']:
for elem, elem_full_tag, parent_elem in dicom_tpp.enumerate_elements_match_tag_path_patterns(self.dicom, t['TagPatterns'], t['ExceptTagPatterns']):
_last_action = f'ShiftDateTime Tag={elem_full_tag}'
if elem.VR in ('DA', 'DT', 'TM') and not elem.is_empty:
_process_each_elem_item(shift_date_time, elem, elem_full_tag, t)
### RandomizeText
if 'RandomizeText' in self._transformations:
_last_action = f'RandomizeText'
def randomize_text(elem, item_value, elem_full_tag, t):
"""
Split the original item value if specified by `Split`, replace each part by a random
8-character string, and rejoin the parts if needed.
Args:
elem: pydicom DataElement
elem_i (int): If elem contains multiple values, `elem_i` is the index of the value to
process. If there is a single value, `elem_i ` equals `None`
elem_full_tag (str): Full path to the element
t: dict for the current transformation
"""
old_value_before_split = str(item_value)
old_value_after_split = old_value_before_split.split(t['Split']) if t['Split'] is True else [str(old_value_before_split)]
new_value_before_join = []
for old_value in old_value_after_split:
if old_value == '':
new_value = ''
else:
old_value = old_value.lower() if t['IgnoreCase'] is True else old_value
random_value = ''.join(random.choice(string.ascii_letters + string.digits) for i in range(8))
new_value = _get_new_value_from_mapping(t, 'DATETIME', old_value, random_value)
new_value_before_join.append(new_value)
final_value = t['Split'].join(new_value_before_join) if t['Split'] is True else new_value_before_join[0]
_log_t('RandomizeText', f"Tag={elem_full_tag} OldValue={old_value_before_split} NewValue={final_value}")
return final_value
for t in self._transformations['RandomizeText']:
for elem, elem_full_tag, parent_elem in dicom_tpp.enumerate_elements_match_tag_path_patterns(self.dicom, t['TagPatterns'], t['ExceptTagPatterns']):
_last_action = f'RandomizeText Tag={elem_full_tag}'
if not elem.is_empty:
if elem_full_tag == '00100020':
_old_patient_id = elem.value
_process_each_elem_item(randomize_text, elem, elem_full_tag, t)
### RandomizeUID
if 'RandomizeUID' in self._transformations:
_last_action = f'RandomizeUID'
def randomize_uid(elem, item_value, elem_full_tag, t):
"""
Replaces the old UID by a new UID. If the old UID already exists in the mapping
table of the database, it is always replaced by the same UID.
Args:
elem: pydicom DataElement
elem_i (int): If elem contains multiple values, `elem_i` is the index of the value to
process. If there is a single value, `elem_i ` equals `None`
elem_full_tag (str): Full path to the element
t: dict for the current transformation
"""
old_uid = str(item_value)
random_uid = pydicom.uid.generate_uid(prefix=t['Prefix']) if 'Prefix' in t else pydicom.uid.generate_uid()
new_uid = self._db_mapping.add_or_get_mapping('UID', old_uid, random_uid, 'always', 'always')
# Update the tag value, and the meta header tag MediaStorageSOPInstanceUID if the
# current element is SOPInstanceUID
if elem_full_tag == '00080018':
self.dicom.file_meta.MediaStorageSOPInstanceUID = new_uid
_log_t('RandomizeUID', f"Tag={elem_full_tag} OldValue={old_uid} NewValue={new_uid}")
return new_uid
for t in self._transformations['RandomizeUID']:
for elem, elem_full_tag, parent_elem in dicom_tpp.enumerate_elements_match_tag_path_patterns(self.dicom, t['TagPatterns'], t['ExceptTagPatterns']):
_last_action = f'RandomizeUID Tag={elem_full_tag}'
# Ignore the element if its VR is not UI
if elem.VR == 'UI' and not elem.is_empty:
_process_each_elem_item(randomize_uid, elem, elem_full_tag, t)
### AddTags
if 'AddTags' in self._transformations:
_last_action = f'AddTags'
for t in self._transformations['AddTags']:
_last_action = f"AddTags Tag={t['Tag']}"
for parent_elem, tag_int in dicom_tp.enumerate_parent_elements(self.dicom, t['Tag']):
if tag_int in parent_elem and t['OverwriteIfExists'] is False:
continue
new_elem = pydicom.dataelem.DataElement(tag_int, t['VR'], t['Value'])
parent_elem.add(new_elem)
_log_t('AddTags', f"Tag={t['Tag']}")
## RemoveBurnedInAnnotations
if 'RemoveBurnedInAnnotations' in self._transformations:
_last_action = f'RemoveBurnedInAnnotations'
pixels = self.dicom.pixel_array
width, height = rpacs_dicom_util.get_dimensions(self.dicom)
samples_per_pixel = rpacs_dicom_util.get_samples_per_pixel(self.dicom)
_last_action = f'RemoveBurnedInAnnotations Step=CreateMask PixelArrayShape={pixels.shape} Width={width} Height={height} SamplesPerPixel={samples_per_pixel}'
# Generate a mask that will be used to replace boxes to mask with black pixels. The mask
# contains only "1" values first, and will be set to "0" later for pixels to obscur
if pixels.ndim == 4:
# (frames, Y, X, channel)
mask = np.ones((1, height, width, 1), dtype=np.uint8)
elif pixels.ndim == 3 and pixels.shape[2] == samples_per_pixel:
# (Y, X, channel)
mask = np.ones((height, width, 1), dtype=np.uint8)
elif pixels.ndim == 3:
# (frames, Y, X)
mask = np.ones((1, height, width), dtype=np.uint8)
else:
# (Y, X)
mask = np.ones((height, width), dtype=np.uint8)
for t in self._transformations['RemoveBurnedInAnnotations']:
if 'BoxCoordinates' in t:
for box in t['BoxCoordinates']:
box_left, box_top, box_right, box_bottom = box
_last_action = f'RemoveBurnedInAnnotations Step=EditMask PixelArrayShape={pixels.shape} MaskShape={mask.shape} Box=({box_left}, {box_top}, {box_right}, {box_bottom})'
box_left = max(0, min(width-1, box_left))
box_right = max(0, min(width-1, box_right))
box_top = max(0, min(height-1, box_top))
box_bottom = max(0, min(height-1, box_bottom))
# Put zeros in the mask where pixels must be obscured
if pixels.ndim == 4:
mask[0, box_top:box_bottom, box_left:box_right, 0] = 0
elif pixels.ndim == 3 and pixels.shape[2] == samples_per_pixel:
mask[box_top:box_bottom, box_left:box_right, 0] = 0
elif pixels.ndim == 3:
mask[0, box_top:box_bottom, box_left:box_right] = 0
else:
mask[box_top:box_bottom, box_left:box_right] = 0
_log_t('RemoveBurnedInAnnotations', f"Type={t['Type']} Box=({box_left}, {box_top}, {box_right}, {box_bottom})")
# Apply the mask and updated the DICOM image tags accordingly
_last_action = f'RemoveBurnedInAnnotations Step=ApplyMask PixelArrayShape={pixels.shape} MaskShape={mask.shape}'
new_pixels = mask * pixels
self.dicom.PixelData = new_pixels.tobytes()
self.dicom.BitsAllocated = pixels.itemsize*8
self.dicom.BitsStored = pixels.itemsize*8
self.dicom.HighBit = pixels.itemsize*8-1
if samples_per_pixel > 1:
self.dicom.PlanarConfiguration = 0
### DeleteTags
if 'DeleteTags' in self._transformations:
_last_action = 'DeleteTags'
for t in self._transformations['DeleteTags']:
for elem, elem_full_tag, parent_elem in dicom_tpp.enumerate_elements_match_tag_path_patterns(self.dicom, t['TagPatterns'], t['ExceptTagPatterns']):
_last_action = f'DeleteTags Tag={elem_full_tag}'
if t['Action'] == 'Remove':
del parent_elem[elem.tag]
else:
elem.clear()
_log_t('DeleteTags', f"Tag={elem_full_tag} Action={t['Action']}")
### Transcode
if 'Transcode' in self._transformations:
dst_transfer_syntax = self._transformations['Transcode']
_log_t('Transcode', f"{dst_transfer_syntax}")
except Exception as e:
raise Exception(f'Last action attempted: {_last_action} - {e}')
# Check if the current DICOM file in `self.dicom` should be transcoded to a new transfer
# syntax and return the new transfer syntax and the list of changes applied
return None if self.dicom.file_meta.TransferSyntaxUID == dst_transfer_syntax else dst_transfer_syntax