def psi()

in src/mlmax/monitoring.py [0:0]


    def psi(expected_array, actual_array, bins):
        """Calculate the PSI for a single variable
        Args:
           expected_array: numpy array of original values
           actual_array: numpy array of new values, same size as expected
           buckets: number of percentile ranges to bucket the values into

        Returns:
           psi_value: calculated PSI value
        """

        def scale_range(input_arr: np.ndarray, new_min: float, new_max: float):
            """Scale values into 10 equal range intervals."""
            temp = (new_max - new_min) * (input_arr - np.min(input_arr))
            temp = temp / (np.max(input_arr) - np.min(input_arr))
            temp = temp + new_min
            return temp

        def sub_psi(e_perc, a_perc):
            """Calculate the actual PSI value from comparing the values.
            Update the actual value to a very small number if equal to zero
            """
            if a_perc == 0:
                a_perc = 0.0001
            if e_perc == 0:
                e_perc = 0.0001

            value = (e_perc - a_perc) * np.log(e_perc / a_perc)
            return value

        # Breakpoint [0, 100] with equal bins
        breakpoints = np.arange(0, bins + 1) / bins * 100

        if buckettype == "bins":
            breakpoints = scale_range(
                breakpoints, np.min(expected_array), np.max(expected_array)
            )
        elif buckettype == "quantiles":
            breakpoints = np.stack(
                [np.percentile(expected_array, b) for b in breakpoints]
            )
        # Percentage of count for each bin
        expected_percents = np.histogram(expected_array, breakpoints)[0] / len(
            expected_array
        )
        actual_percents = np.histogram(actual_array, breakpoints)[0] / len(actual_array)

        psi_value = sum(
            sub_psi(expected_percents[i], actual_percents[i])
            for i in range(0, len(expected_percents))
        )

        return psi_value