in misc/CCSynth/CC/DataInsights/src/prose/datainsights/_assertion/_assertion_helper.py [0:0]
def validate(self, df, explanation, normalizeViolation=True):
# N = number of data points, I = number of invs, C = number of cols
# Returns
# row_wise_inv_violation, (N X I)
# : violation for each invariant by each row
# row_wise_inv_compatibility, (N X I)
# : a boolean mask representing which invariant is compatible with which row
# row_wise_violation_summary, (N X 2) or (N X C+2)
# : aggregated violation for each row, over all invariants
# : when explanation is not required, returns 2 new columns for each row --- "violation", "num_of_invs"
# : when explanation is required, appends the above 2 new columns to the original dataframe
# row_wise_per_attribute_violation_contribution (N X C+2)
# : assigns blame to each attribute for violation
def distance_to_violation_degree(d):
d = np.where(d > 0, d, 0)
d = np.multiply(d, self.std_dev_inv)
if normalizeViolation:
d = 1 - np.exp(-d)
return d
cur_df = self.constrained_invariants[0].data_assertion._transform(df).dropna()
N, C, I = cur_df.shape[0], cur_df.shape[1], self.inv_matrix.shape[1]
row_wise_inv_violation = pd.DataFrame(np.zeros((N, I)), index=df.index)
row_wise_inv_compatibility = pd.DataFrame(np.zeros((N, I)), index=df.index)
row_wise_violation_summary = pd.DataFrame(
columns=["violation", "num_of_invs"], index=df.index
)
row_wise_per_attribute_violation_contribution = pd.DataFrame(
np.zeros((N, C)), index=cur_df.index, columns=cur_df.columns
)
######### Computing row_wise_inv_compatibility #########
inv_count = 0
for i in range(len(self.constrained_invariants)):
compatible_rows = (
self.constrained_invariants[i].constraint.apply(df).index.values
)
cur_inv_count = self.constrained_invariants[i].data_assertion.size()
row_wise_inv_compatibility.loc[
compatible_rows, inv_count : inv_count + cur_inv_count - 1
] = 1
inv_count += cur_inv_count
########################################################
########### Computing row_wise_inv_violation ###########
row_wise_inv_weight = np.multiply(
row_wise_inv_compatibility, 1 / np.log(2 + self.std_dev)
)
row_wise_inv_weight = np.multiply(
row_wise_inv_weight.T,
1
/ np.maximum(
np.full((row_wise_inv_weight.shape[0],), 1e-10),
np.sum(row_wise_inv_weight, axis=1),
),
).T
s = np.dot(cur_df, self.inv_matrix)
v1 = distance_to_violation_degree(np.abs(s - self.mean) - 4 * self.std_dev)
v2 = distance_to_violation_degree(self.min - s)
v3 = distance_to_violation_degree(s - self.max)
violations = np.maximum(np.maximum(v1, v2), v3)
row_wise_inv_violation = pd.DataFrame(
np.multiply(
np.multiply(violations, row_wise_inv_compatibility), row_wise_inv_weight
),
index=df.index,
)
########################################################
########### Computing row_wise_violation_summary ###########
row_wise_violation_summary["num_of_invs"] = np.sum(
row_wise_inv_compatibility, axis=1
)
row_wise_violation_summary["violation"] = np.sum(row_wise_inv_violation, axis=1)
row_wise_violation_summary = row_wise_violation_summary.fillna(0)
############################################################
if explanation:
# Only compute row_wise_per_attribute_violation_contribution when explanation is required
# Also, update row_wise_violation_summary to include the original dataframe for ease of exposition
row_wise_violation_summary, row_wise_per_attribute_violation_contribution = self.update_for_explanation(
df, row_wise_inv_violation, row_wise_violation_summary
)
return (
row_wise_inv_violation,
row_wise_inv_compatibility,
row_wise_violation_summary,
row_wise_per_attribute_violation_contribution,
)