main_flow.py (87 lines of code) (raw):

from metaflow import Parameter, step, FlowSpec, schedule, pypi import wandb # Runs each day at 1 AM UTC @schedule(cron='0 10 * * ? *', timezone='Etc/UTC') class SearchTermDataValidationFlow(FlowSpec): ''' A flow that checks some aggregate metrics on the day's search traffic (query length, capitalization, language classification) against historic metrics to detect changes in the makeup of search requests to Firefox. ''' data_validation_origin = Parameter('data_validation_origin', help='The table from which to draw the data for validation', required=True, default='moz-fx-data-shared-prod.search_terms.sanitization_job_data_validation_metrics', type=str) data_validation_reporting_destination = Parameter('data_validation_reporting_destination', help='The table into which to put the validation results', required=True, default='moz-fx-data-shared-prod.search_terms_derived.search_term_data_validation_reports_v1', type=str) @step def start(self): ''' Metaflow flows must begin with a function called 'start.' So here's the start function. It prints out the input parameters to the job and initializes an experiment tracking run. ''' import os print(os.getenv('GOOGLE_APPLICATION_CREDENTIALS')) print(f"Data Validation Origin: {self.data_validation_origin}") print(f"Data Validation Reporting Destination: {self.data_validation_reporting_destination}") self.next(self.retrieve_metrics) @pypi( python='3.10.11', packages={ 'pandas': '2.1.4', 'google-api-core': '2.19.0', 'google-cloud-storage': '2.16.0', 'google-cloud-bigquery': '3.25.0', 'db-dtypes': '1.2.0', 'wandb': '0.16.6', } ) @step def retrieve_metrics(self): ''' Retrieves search term sanitization aggregation data from BigQuery, then checks that they have not varied outside appreciable tolerances in the past X days ('X' is a window set for each metric) ''' print("Retrieving Data Validation Metrics Now...") from data_validation import retrieve_data_validation_metrics self.validation_df = retrieve_data_validation_metrics(self.data_validation_origin) self.next(self.record_results) @pypi( python='3.10.11', packages={ 'pandas': '2.1.4', 'google-api-core': '2.19.0', 'google-cloud-storage': '2.16.0', 'google-cloud-bigquery': '3.25.0', 'db-dtypes': '1.2.0', 'wandb': '0.16.6', } ) @step def record_results(self): ''' Shoves the validation metrics calculated in the prior step into a BigQuery table. That table has a dashboard in looker, complete with alerts to notify data scientists if there are any changes that require manual inspection. ''' print(f"Input Dataframe Shape: {self.validation_df.shape}") print("Recording validation results...") from data_validation import record_validation_results record_validation_results(self.validation_df, self.data_validation_reporting_destination) self.next(self.end) @step def end(self): ''' Metaflow flows end with a function called 'end.' So here's the end function. It logs artifacts to wandb and then prints an encouraging message. We could all use one every now and then. ''' print(f'That was easy!') if __name__ == '__main__': SearchTermDataValidationFlow()