analysis/download.py (78 lines of code) (raw):

# Copyright (c) Microsoft Corporation. # Licensed under the MIT license. """Download analysis results from a Log Analytics workspace""" import json import logging from datetime import datetime, timedelta from typing import Optional import pandas as pd from jinja2 import Template from scipy.stats import false_discovery_control from .log_analytics import LogAnalyticsWorkspace, query_df from .results import AnalysisMetadata, AnalysisResults, VariantMetadata QUERY_ANALYSIS = """ OEWExperimentScorecards | where FeatureName == '{{ feature_flag }}' {% if allocation_id %}| where AllocationId == '{{ allocation_id }}'{% endif %} | summarize arg_max(TimeGenerated, *) by ScorecardId | summarize arg_max(TimeGenerated, *) by AnalysisEndTime | summarize arg_max(AnalysisEndTime, *) | project ScorecardId, FeatureName, AllocationId, AnalysisStartTime, AnalysisEndTime, Variants=Insights.Assignment | where FeatureName == '{{ feature_flag }}' """.strip() QUERY_SCORECARD = """ OEWExperimentScorecardMetricPairs | where ScorecardId == '{{ scorecard_id }}' | summarize arg_max(TimeGenerated, *) by ScorecardId, MetricId, TreatmentVariant, ControlVariant """.strip() def treatment_effect_assessment(df: pd.DataFrame) -> bool: """Overall assessment of whether a treatment effect was detected""" df_evaluated = df.loc[ lambda df: ~df["MetricCategories"].str.contains("__Internal__") & ~df["TreatmentEffect"].isin(["Zero samples", "Too few samples"]) & (df["TreatmentStandardErrorNormalized"] > 0) & (df["ControlStandardErrorNormalized"] > 0) & (df["PValue"] < 1) & (df["PValue"] >= 0) ] pvalues = df_evaluated["PValue"].to_numpy() if len(pvalues) > 0: pvalue_bh = false_discovery_control(pvalues, method="bh").min() return pvalue_bh <= 0.05 return False def extract_metadata(df: pd.DataFrame) -> Optional[AnalysisMetadata]: """Extract metadata from the scorecard record in Log Analytics.""" if df.empty: return None config = df.iloc[0].to_dict() try: variants = json.loads(config["Variants"]) except (KeyError, json.JSONDecodeError): logging.warning("Variant metadata are unavailable") variants = [] return AnalysisMetadata( feature_flag=config["FeatureName"], allocation_id=config["AllocationId"], scorecard_id=config["ScorecardId"], start_time=config["AnalysisStartTime"].to_pydatetime(), end_time=config["AnalysisEndTime"].to_pydatetime(), variants=[ VariantMetadata( variant=x["Variant"], is_control=x["IsControlVariant"], allocated_pct=x["VariantAssignmentPercentage"], assigned_users=x["AssignmentUserCount"], is_srm=x["SampleRatioMismatchPValue"] <= 0.0005, is_tea=False, # populate later based on metric data ) for x in variants ], ) def latest_analysis( credential, workspace: LogAnalyticsWorkspace, *, feature_flag: str, allocation_id: Optional[str], timespan: timedelta | tuple[datetime, timedelta] | tuple[datetime, datetime] ) -> Optional[AnalysisResults]: """Retrieve results for the latest analysis of an experiment. Parameters ---------- credential: azure.core.credentials.TokenCredential Microsoft Entra ID token for Azure SDK authentication. workspace: LogAnalyticsWorkspace Azure Log Analytics workspace storing analysis results. feature_flag: str Azure App Config feature flag name. allocation_id: Optional[str] Allocation ID for the experiment. If unspecified, the latest experiment is selected. timespan: timedelta | tuple[datetime, timedelta] | tuple[datetime, datetime] Time range to search for the experiment. """ # find the latest experiment analysis (latest allocation + date range) query = Template(QUERY_ANALYSIS).render( feature_flag=feature_flag, allocation_id=allocation_id ) df_analysis = query_df(credential, workspace, query, timespan=timespan) analysis = extract_metadata(df_analysis) if analysis is None: return None # retrieve precomputed scorecard query = Template(QUERY_SCORECARD).render(scorecard_id=analysis.scorecard_id) df_scorecard = query_df(credential, workspace, query, timespan=timespan) results = AnalysisResults(analysis, df_scorecard) # update 'is_tea' for each variant for variant in results.analysis.variants: scorecard_variant = results.scorecard.loc[ results.scorecard["TreatmentVariant"] == variant.variant ] variant.is_tea = treatment_effect_assessment(scorecard_variant) return results