in jobs/search-term-data-validation-v2/search_term_data_validation_v2/data_validation.py [0:0]
def export_data_validation_metrics_to_bigquery(dataframe, destination_table_id):
"""
Append data validation metrics to the BigQuery table tracking these metrics from job metadata.
Arguments:
- dataframe: A dataframe of validation metrics to be added.
- destination_table_id: the fully qualified name of the table for the data to be exported into.
Returns: Nothing.
It does print a result value as a cursory logging mechanism. That result object can be parsed and logged to wherever we like.
"""
client = bigquery.Client(project=project)
schema = [
bigquery.SchemaField("finished_at", bigquery.enums.SqlTypeNames.STRING),
bigquery.SchemaField(
"pct_sanitized_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_sanitized_contained_at", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_sanitized_contained_numbers", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_sanitized_contained_name", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_terms_containing_us_census_surname",
bigquery.enums.SqlTypeNames.FLOAT64,
),
bigquery.SchemaField(
"pct_uppercase_chars_all_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"avg_words_all_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
),
bigquery.SchemaField(
"pct_terms_non_english", bigquery.enums.SqlTypeNames.FLOAT64
),
]
destination_table = bigquery.Table(destination_table_id)
job = client.insert_rows_from_dataframe(
table=destination_table, dataframe=dataframe, selected_fields=schema
)
print(job)