def export_data_validation_metrics_to_bigquery()

in jobs/search-term-data-validation-v2/search_term_data_validation_v2/data_validation.py [0:0]


def export_data_validation_metrics_to_bigquery(dataframe, destination_table_id):
    """
    Append data validation metrics to the BigQuery table tracking these metrics from job metadata.

    Arguments:
    - dataframe: A dataframe of validation metrics to be added.
    - destination_table_id: the fully qualified name of the table for the data to be exported into.

    Returns: Nothing.
    It does print a result value as a cursory logging mechanism. That result object can be parsed and logged to wherever we like.
    """
    client = bigquery.Client(project=project)

    schema = [
        bigquery.SchemaField("finished_at", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField(
            "pct_sanitized_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
        ),
        bigquery.SchemaField(
            "pct_sanitized_contained_at", bigquery.enums.SqlTypeNames.FLOAT64
        ),
        bigquery.SchemaField(
            "pct_sanitized_contained_numbers", bigquery.enums.SqlTypeNames.FLOAT64
        ),
        bigquery.SchemaField(
            "pct_sanitized_contained_name", bigquery.enums.SqlTypeNames.FLOAT64
        ),
        bigquery.SchemaField(
            "pct_terms_containing_us_census_surname",
            bigquery.enums.SqlTypeNames.FLOAT64,
        ),
        bigquery.SchemaField(
            "pct_uppercase_chars_all_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
        ),
        bigquery.SchemaField(
            "avg_words_all_search_terms", bigquery.enums.SqlTypeNames.FLOAT64
        ),
        bigquery.SchemaField(
            "pct_terms_non_english", bigquery.enums.SqlTypeNames.FLOAT64
        ),
    ]

    destination_table = bigquery.Table(destination_table_id)
    job = client.insert_rows_from_dataframe(
        table=destination_table, dataframe=dataframe, selected_fields=schema
    )

    print(job)