backfill/2024-03-23-clients_last_seen/telemetry_derived_clients_last_seen_v2_20240322/backfill_clients_last_seen_v2.py [234:295]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
)


def get_bigquery_schema(schema_yaml):
    bigquery_schema = []
    for item in schema_yaml:
        if item['type'] == 'RECORD':
            fields = []
            for field in item['fields']:
                fields.append({
                    'mode': field['mode'],
                    'type': field['type'],
                    'name': field['name'],
                })
            bigquery_schema.append({
                'mode': item['mode'],
                'type': item['type'],
                'name': item['name'],
                'fields': fields
            })
        else:
            bigquery_schema.append({
                'mode': item['mode'],
                'type': item['type'],
                'name': item['name'],
            })
    return bigquery_schema


def _backfill_staging_table(client, job_config, project_id, dataset, destination_table, bigquery_schema, submission_date, sample_id):
    """Backfill for a submission_date, sample_id combination."""
    full_table_id = f"{project_id}.{dataset}.{destination_table}_{sample_id}"
    try:
        table = client.get_table(full_table_id)
    except NotFound:
        table = bigquery.Table(full_table_id)
        table.schema = bigquery_schema
        table.time_partitioning = bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='submission_date'
        )
        table.clustering_fields =['normalized_channel', 'sample_id']

    if not table.created:
        client.create_table(table)
        click.echo(f"Destination table {full_table_id} created.")
    else:
        client.update_table(table, ["schema"])

    if (
            partition := get_backfill_partition(
                submission_date,
                "submission_date",
                0,
                PartitionType.DAY
            )
        ) is not None:
        dest_table = f"{destination_table}_{sample_id}${partition}"

    print(f"Running a backfill for sample_id={sample_id} to {dest_table}")

    arguments = (
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



backfill/2024-03-23-clients_last_seen/telemetry_derived_clients_last_seen_v2_20240322/backfill_clients_last_seen_v2_in_BQproject_backfill_1.py [250:311]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
)


def get_bigquery_schema(schema_yaml):
    bigquery_schema = []
    for item in schema_yaml:
        if item['type'] == 'RECORD':
            fields = []
            for field in item['fields']:
                fields.append({
                    'mode': field['mode'],
                    'type': field['type'],
                    'name': field['name'],
                })
            bigquery_schema.append({
                'mode': item['mode'],
                'type': item['type'],
                'name': item['name'],
                'fields': fields
            })
        else:
            bigquery_schema.append({
                'mode': item['mode'],
                'type': item['type'],
                'name': item['name'],
            })
    return bigquery_schema


def _backfill_staging_table(client, job_config, project_id, dataset, destination_table, bigquery_schema, submission_date, sample_id):
    """Backfill for a submission_date, sample_id combination."""
    full_table_id = f"{project_id}.{dataset}.{destination_table}_{sample_id}"
    try:
        table = client.get_table(full_table_id)
    except NotFound:
        table = bigquery.Table(full_table_id)
        table.schema = bigquery_schema
        table.time_partitioning = bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field='submission_date'
        )
        table.clustering_fields =['normalized_channel', 'sample_id']

    if not table.created:
        client.create_table(table)
        click.echo(f"Destination table {full_table_id} created.")
    else:
        client.update_table(table, ["schema"])

    if (
            partition := get_backfill_partition(
                submission_date,
                "submission_date",
                0,
                PartitionType.DAY
            )
        ) is not None:
        dest_table = f"{destination_table}_{sample_id}${partition}"

    print(f"Running a backfill for sample_id={sample_id} to {dest_table}")

    arguments = (
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



