wandb/run-20240613_114939-v6kp8m3p/files/code/main_flow.py (62 lines of code) (raw):
from metaflow import Parameter, step, FlowSpec
import wandb
from data_validation import retrieve_data_validation_metrics, record_validation_results
class SearchTermDataValidationFlow(FlowSpec):
data_validation_origin = Parameter('data_validation_origin',
help='The table from which to draw the data for validation',
required=True,
type=str)
data_validation_reporting_destination = Parameter('data_validation_reporting_destination',
help='The table into which to put the validation results',
required=True,
type=str)
@step
def start(self):
'''
Metaflow flows must begin with a function called 'start.'
So here's the start function.
It prints out the input parameters to the job
and initializes an experiment tracking run.
'''
print(f"Data Validation Origin: {self.data_validation_origin}")
print(f"Data Validation Reporting Destination: {self.data_validation_reporting_destination}")
self.next(self.retrieve_metrics)
@step
def retrieve_metrics(self):
'''
Retrieves search term sanitization aggregation data from BigQuery,
then checks that they have not varied outside appreciable tolerances
in the past X days ('X' is a window set for each metric)
'''
print("Retrieving Data Validation Metrics Now...")
self.validation_df = retrieve_data_validation_metrics(self.data_validation_origin)
self.next(self.record_results)
@step
def record_results(self):
'''
Shoves the validation metrics calculated in the prior step into a BigQuery table.
That table has a dashboard in looker, complete with alerts
to notify data scientists if there are any changes that require manual inspection.
'''
print(f"Input Dataframe Shape: {self.validation_df.shape}")
print("Recording validation results...")
record_validation_results(self.validation_df, self.data_validation_reporting_destination)
self.next(self.end)
@step
def end(self):
'''
Metaflow flows end with a function called 'end.'
So here's the end function. It logs artifacts to wandb
and then prints an encouraging message.
We could all use one every now and then.
'''
run = wandb.init(
project="instep-wandb-search-term-data-validation",
config={
"example_metric": "Example Value!"
}
)
run.log({"search-term-data-validation-df": wandb.Table(dataframe=self.validation_df)})
print(f'That was easy!')
if __name__ == '__main__':
SearchTermDataValidationFlow()