in data_extraction_transformation/scripts/one_time_use_scripts/extract_signatures_properties.py [0:0]
def process_sig_df(dataf, signature_id):
sig_characteristics = dict()
try:
#signature_id = dataf["signature_id"].unique().tolist()[0]
sig_characteristics["repository"] = dataf["repository_name"].unique().tolist()[0]
sig_characteristics["framework"] = dataf["framework_id"].unique().tolist()[0]
sig_characteristics["platform"] = dataf["machine_platform"].unique().tolist()[0]
sig_characteristics["option_collection"] = dataf["option_collection_hash"].unique().tolist()[0]
sig_characteristics["test"] = dataf["test"].unique().tolist()[0]
sig_characteristics["suite"] = dataf["suite"].unique().tolist()[0]
sig_characteristics["application"] = dataf["application"].unique().tolist()[0]
sig_characteristics["lower_is_better"] = dataf["lower_is_better"].unique().tolist()[0]
sig_characteristics["parent_signature"] = dataf["parent_signature"].unique().tolist()[0]
sig_characteristics["has_subtests"] = dataf["has_subtests"].unique().tolist()[0]
sig_characteristics["suite_public_name"] = dataf["single_alert_series_signature_suite_public_name"].unique().tolist()[0]
sig_characteristics["test_public_name"] = dataf["single_alert_series_signature_test_public_name"].unique().tolist()[0]
sig_characteristics["tags"] = dataf["tags"].unique().tolist()[0]
sig_characteristics["extra_options"] = dataf["extra_options"].unique().tolist()[0]
sig_characteristics["measurement_unit"] = dataf["measurement_unit"].unique().tolist()[0]
sig_characteristics["should_alert"] = dataf["should_alert"].unique().tolist()[0]
sig_characteristics["last_updated"] = None
sig_characteristics["min_back_window"] = None
sig_characteristics["max_back_window"] = None
sig_characteristics["fore_window"] = None
sig_characteristics["alert_threshold"] = None
sig_characteristics["alert_change_type"] = None
sig_characteristics["timeseries_length"] = len(dataf)
sig_characteristics["nb_unique_revisions"] = len(dataf["revision"].unique().tolist())
# Count occurrences for each status, considering only rows with unique push_timestamp values
unique_push_dataf = dataf.drop_duplicates(subset=["push_timestamp"])
sig_characteristics["fn_count"] = len(unique_push_dataf[unique_push_dataf["alert_summary_status_general"] == "FN"])
sig_characteristics["fp_count"] = len(unique_push_dataf[unique_push_dataf["alert_summary_status_general"] == "FP"])
sig_characteristics["sp_count"] = len(unique_push_dataf[unique_push_dataf["alert_summary_status_general"] == "SP"])
sig_characteristics["tp_count"] = len(unique_push_dataf[unique_push_dataf["alert_summary_status_general"] == "TP"])
# Time series statistics
values = dataf["value"]
# Minimum value in the series
sig_characteristics["min"] = values.min()
# Maximum value in the series
sig_characteristics["max"] = values.max()
# Mean (average) value
sig_characteristics["mean"] = values.mean()
# Median (middle value when sorted)
sig_characteristics["median"] = values.median()
# Standard deviation (measure of dispersion)
sig_characteristics["std"] = values.std()
# Variance (spread of the data)
sig_characteristics["variance"] = values.var()
# Interquartile range (spread between Q3 and Q1)
sig_characteristics["iqr"] = values.quantile(0.75) - values.quantile(0.25)
# Skewness (measure of asymmetry)
sig_characteristics["skewness"] = values.skew()
# Kurtosis (measure of outlier presence)
sig_characteristics["kurtosis"] = values.kurt()
# Coefficient of variation (relative measure of dispersion)
sig_characteristics["coefficient_of_variation"] = values.std() / values.mean() if values.mean() != 0 else None
# Range (max - min)
sig_characteristics["range"] = values.max() - values.min()
# Mean absolute deviation (average absolute deviation from mean)
sig_characteristics["mad"] = values.mad() if hasattr(values, "mad") else values.sub(values.mean()).abs().mean()
# Mode (most frequent value)
# sig_characteristics["mode"] = values.mode().tolist()
# Harmonic mean (useful for rates, requires positive values)
sig_characteristics["harmonic_mean"] = values.apply(lambda x: np.reciprocal(x)).mean() if all(values > 0) else None
# Geometric mean (useful for growth rates, requires positive values)
sig_characteristics["geometric_mean"] = np.exp(np.mean(np.log(values[values > 0]))) if all(values > 0) else None
# Autocorrelation with lag 1 (correlation with previous value)
sig_characteristics["autocorrelation_lag1"] = values.autocorr(lag=1)
# Autocorrelation with lag 2 (correlation with value two steps back)
sig_characteristics["autocorrelation_lag2"] = values.autocorr(lag=2)
# 10th percentile (value below which 10% of data falls)
sig_characteristics["percentile_10"] = values.quantile(0.10)
# 90th percentile (value below which 90% of data falls)
sig_characteristics["percentile_90"] = values.quantile(0.90)
# Entropy (measure of randomness in the distribution)
sig_characteristics["entropy"] = -np.sum((values.value_counts() / len(values)) * np.log2(values.value_counts() / len(values)))
return sig_characteristics
except:
print(signature_id)
return dict()