def main()

in sql/moz-fx-data-shared-prod/external_derived/population_v1/query.py [0:0]


def main():
    """Call the API, save data to GCS, delete any data already in table for same year, then load to BQ table"""
    parser = ArgumentParser(description=__doc__)
    parser.add_argument("--date", required=True)
    args = parser.parse_args()
    logical_dag_date = datetime.strptime(args.date, "%Y-%m-%d").date()
    logical_dag_date_string = logical_dag_date.strftime("%Y-%m-%d")

    # Calculate current date
    today = datetime.today()
    curr_date = today.strftime("%Y-%m-%d")

    # Calculate year of interest from the DAG run date
    # DAG runs 1x a year, so this gets the year from the DAG logical date
    year_of_interest = logical_dag_date.strftime("%Y")
    year_of_interest = str(int(year_of_interest) + 1)
    print(f"Pulling data for year: {year_of_interest}")

    # Initialize an empty data frame which we will append all results to
    full_results_df = pd.DataFrame(
        {
            "location_id": [],
            "location": [],
            "iso3_country_code": [],
            "iso2_country_code": [],
            "location_type_id": [],
            "indicator_id": [],
            "indicator": [],
            "indicator_display_name": [],
            "source_id": [],
            "source": [],
            "revision": [],
            "variant_id": [],
            "variant": [],
            "variant_short_name": [],
            "variant_label": [],
            "time_id": [],
            "time_label": [],
            "time_mid": [],
            "category_id": [],
            "category": [],
            "estimate_type_id": [],
            "estimate_type": [],
            "estimate_method_id": [],
            "estimate_method": [],
            "sex_id": [],
            "sex": [],
            "age_id": [],
            "age_label": [],
            "age_start": [],
            "age_end": [],
            "age_mid": [],
            "value": [],
            "last_updated": [],
        }
    )

    # For each location
    for loc_id in LOC_IDS_OF_INTEREST:
        # Get the population data as a dataframe
        population_data = pull_population_data(
            year_of_interest, loc_id, INDICATOR_ID_OF_INTEREST
        )

        # Append the new data to the full_results_df
        full_results_df = pd.concat([full_results_df, population_data])

    # Enforce data types
    full_results_df["location_id"] = full_results_df["location_id"].astype(int)
    full_results_df["location_type_id"] = full_results_df["location_type_id"].astype(
        int
    )
    full_results_df["indicator_id"] = full_results_df["indicator_id"].astype(int)
    full_results_df["source_id"] = full_results_df["source_id"].astype(int)
    full_results_df["revision"] = full_results_df["revision"].astype(int)
    full_results_df["variant_id"] = full_results_df["variant_id"].astype(int)
    full_results_df["time_id"] = full_results_df["time_id"].astype(int)
    full_results_df["category_id"] = full_results_df["category_id"].astype(int)
    full_results_df["estimate_type_id"] = full_results_df["estimate_type_id"].astype(
        int
    )
    full_results_df["estimate_method_id"] = full_results_df[
        "estimate_method_id"
    ].astype(int)
    full_results_df["sex_id"] = full_results_df["sex_id"].astype(int)
    full_results_df["age_id"] = full_results_df["age_id"].astype(int)
    full_results_df["age_start"] = full_results_df["age_start"].astype(int)
    full_results_df["age_end"] = full_results_df["age_end"].astype(int)

    # Add last updated date
    full_results_df["last_updated"] = curr_date

    # Calculate GCS filepath to write to and then write CSV to that filepath
    fpath = (
        GCS_BUCKET
        + f"UN_Population_Data/pop_data_year_{year_of_interest}_as_of_{logical_dag_date_string}.csv"
    )
    full_results_df.to_csv(fpath, index=False)

    # Open a connection to BQ
    client = bigquery.Client(TARGET_PROJECT)

    # Delete any data already in table for same year so we don't end up with duplicates
    delete_query = f"""DELETE FROM `moz-fx-data-shared-prod.external_derived.population_v1`
  WHERE time_label = '{year_of_interest}'"""
    del_job = client.query(delete_query)
    del_job.result()

    # Load data from GCS to BQ table - appending to what is already there
    load_csv_to_gcp_job = client.load_table_from_uri(
        fpath,
        TARGET_TABLE,
        job_config=bigquery.LoadJobConfig(
            create_disposition="CREATE_NEVER",
            write_disposition="WRITE_APPEND",
            schema=[
                {"name": "location_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "location", "type": "STRING", "mode": "NULLABLE"},
                {"name": "iso3_country_code", "type": "STRING", "mode": "NULLABLE"},
                {"name": "iso2_country_code", "type": "STRING", "mode": "NULLABLE"},
                {"name": "location_type_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "indicator_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "indicator", "type": "STRING", "mode": "NULLABLE"},
                {
                    "name": "indicator_display_name",
                    "type": "STRING",
                    "mode": "NULLABLE",
                },
                {"name": "source_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "source", "type": "STRING", "mode": "NULLABLE"},
                {"name": "revision", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "variant_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "variant", "type": "STRING", "mode": "NULLABLE"},
                {"name": "variant_short_name", "type": "STRING", "mode": "NULLABLE"},
                {"name": "variant_label", "type": "STRING", "mode": "NULLABLE"},
                {"name": "time_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "time_label", "type": "STRING", "mode": "NULLABLE"},
                {"name": "time_mid", "type": "STRING", "mode": "NULLABLE"},
                {"name": "category_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "category", "type": "STRING", "mode": "NULLABLE"},
                {"name": "estimate_type_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "estimate_type", "type": "STRING", "mode": "NULLABLE"},
                {"name": "estimate_method_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "estimate_method", "type": "STRING", "mode": "NULLABLE"},
                {"name": "sex_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "sex", "type": "STRING", "mode": "NULLABLE"},
                {"name": "age_id", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "age_label", "type": "STRING", "mode": "NULLABLE"},
                {"name": "age_start", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "age_end", "type": "INTEGER", "mode": "NULLABLE"},
                {"name": "age_mid", "type": "NUMERIC", "mode": "NULLABLE"},
                {"name": "value", "type": "NUMERIC", "mode": "NULLABLE"},
                {"name": "last_updated", "type": "DATE", "mode": "NULLABLE"},
            ],
            skip_leading_rows=1,
            source_format=bigquery.SourceFormat.CSV,
        ),
    )

    load_csv_to_gcp_job.result()