in src/mlmax/monitoring.py [0:0]
def psi(expected_array, actual_array, bins):
"""Calculate the PSI for a single variable
Args:
expected_array: numpy array of original values
actual_array: numpy array of new values, same size as expected
buckets: number of percentile ranges to bucket the values into
Returns:
psi_value: calculated PSI value
"""
def scale_range(input_arr: np.ndarray, new_min: float, new_max: float):
"""Scale values into 10 equal range intervals."""
temp = (new_max - new_min) * (input_arr - np.min(input_arr))
temp = temp / (np.max(input_arr) - np.min(input_arr))
temp = temp + new_min
return temp
def sub_psi(e_perc, a_perc):
"""Calculate the actual PSI value from comparing the values.
Update the actual value to a very small number if equal to zero
"""
if a_perc == 0:
a_perc = 0.0001
if e_perc == 0:
e_perc = 0.0001
value = (e_perc - a_perc) * np.log(e_perc / a_perc)
return value
# Breakpoint [0, 100] with equal bins
breakpoints = np.arange(0, bins + 1) / bins * 100
if buckettype == "bins":
breakpoints = scale_range(
breakpoints, np.min(expected_array), np.max(expected_array)
)
elif buckettype == "quantiles":
breakpoints = np.stack(
[np.percentile(expected_array, b) for b in breakpoints]
)
# Percentage of count for each bin
expected_percents = np.histogram(expected_array, breakpoints)[0] / len(
expected_array
)
actual_percents = np.histogram(actual_array, breakpoints)[0] / len(actual_array)
psi_value = sum(
sub_psi(expected_percents[i], actual_percents[i])
for i in range(0, len(expected_percents))
)
return psi_value