in sql/moz-fx-data-shared-prod/external_derived/population_v1/query.py [0:0]
def main():
"""Call the API, save data to GCS, delete any data already in table for same year, then load to BQ table"""
parser = ArgumentParser(description=__doc__)
parser.add_argument("--date", required=True)
args = parser.parse_args()
logical_dag_date = datetime.strptime(args.date, "%Y-%m-%d").date()
logical_dag_date_string = logical_dag_date.strftime("%Y-%m-%d")
# Calculate current date
today = datetime.today()
curr_date = today.strftime("%Y-%m-%d")
# Calculate year of interest from the DAG run date
# DAG runs 1x a year, so this gets the year from the DAG logical date
year_of_interest = logical_dag_date.strftime("%Y")
year_of_interest = str(int(year_of_interest) + 1)
print(f"Pulling data for year: {year_of_interest}")
# Initialize an empty data frame which we will append all results to
full_results_df = pd.DataFrame(
{
"location_id": [],
"location": [],
"iso3_country_code": [],
"iso2_country_code": [],
"location_type_id": [],
"indicator_id": [],
"indicator": [],
"indicator_display_name": [],
"source_id": [],
"source": [],
"revision": [],
"variant_id": [],
"variant": [],
"variant_short_name": [],
"variant_label": [],
"time_id": [],
"time_label": [],
"time_mid": [],
"category_id": [],
"category": [],
"estimate_type_id": [],
"estimate_type": [],
"estimate_method_id": [],
"estimate_method": [],
"sex_id": [],
"sex": [],
"age_id": [],
"age_label": [],
"age_start": [],
"age_end": [],
"age_mid": [],
"value": [],
"last_updated": [],
}
)
# For each location
for loc_id in LOC_IDS_OF_INTEREST:
# Get the population data as a dataframe
population_data = pull_population_data(
year_of_interest, loc_id, INDICATOR_ID_OF_INTEREST
)
# Append the new data to the full_results_df
full_results_df = pd.concat([full_results_df, population_data])
# Enforce data types
full_results_df["location_id"] = full_results_df["location_id"].astype(int)
full_results_df["location_type_id"] = full_results_df["location_type_id"].astype(
int
)
full_results_df["indicator_id"] = full_results_df["indicator_id"].astype(int)
full_results_df["source_id"] = full_results_df["source_id"].astype(int)
full_results_df["revision"] = full_results_df["revision"].astype(int)
full_results_df["variant_id"] = full_results_df["variant_id"].astype(int)
full_results_df["time_id"] = full_results_df["time_id"].astype(int)
full_results_df["category_id"] = full_results_df["category_id"].astype(int)
full_results_df["estimate_type_id"] = full_results_df["estimate_type_id"].astype(
int
)
full_results_df["estimate_method_id"] = full_results_df[
"estimate_method_id"
].astype(int)
full_results_df["sex_id"] = full_results_df["sex_id"].astype(int)
full_results_df["age_id"] = full_results_df["age_id"].astype(int)
full_results_df["age_start"] = full_results_df["age_start"].astype(int)
full_results_df["age_end"] = full_results_df["age_end"].astype(int)
# Add last updated date
full_results_df["last_updated"] = curr_date
# Calculate GCS filepath to write to and then write CSV to that filepath
fpath = (
GCS_BUCKET
+ f"UN_Population_Data/pop_data_year_{year_of_interest}_as_of_{logical_dag_date_string}.csv"
)
full_results_df.to_csv(fpath, index=False)
# Open a connection to BQ
client = bigquery.Client(TARGET_PROJECT)
# Delete any data already in table for same year so we don't end up with duplicates
delete_query = f"""DELETE FROM `moz-fx-data-shared-prod.external_derived.population_v1`
WHERE time_label = '{year_of_interest}'"""
del_job = client.query(delete_query)
del_job.result()
# Load data from GCS to BQ table - appending to what is already there
load_csv_to_gcp_job = client.load_table_from_uri(
fpath,
TARGET_TABLE,
job_config=bigquery.LoadJobConfig(
create_disposition="CREATE_NEVER",
write_disposition="WRITE_APPEND",
schema=[
{"name": "location_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "location", "type": "STRING", "mode": "NULLABLE"},
{"name": "iso3_country_code", "type": "STRING", "mode": "NULLABLE"},
{"name": "iso2_country_code", "type": "STRING", "mode": "NULLABLE"},
{"name": "location_type_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "indicator_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "indicator", "type": "STRING", "mode": "NULLABLE"},
{
"name": "indicator_display_name",
"type": "STRING",
"mode": "NULLABLE",
},
{"name": "source_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "source", "type": "STRING", "mode": "NULLABLE"},
{"name": "revision", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "variant_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "variant", "type": "STRING", "mode": "NULLABLE"},
{"name": "variant_short_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "variant_label", "type": "STRING", "mode": "NULLABLE"},
{"name": "time_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "time_label", "type": "STRING", "mode": "NULLABLE"},
{"name": "time_mid", "type": "STRING", "mode": "NULLABLE"},
{"name": "category_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "category", "type": "STRING", "mode": "NULLABLE"},
{"name": "estimate_type_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "estimate_type", "type": "STRING", "mode": "NULLABLE"},
{"name": "estimate_method_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "estimate_method", "type": "STRING", "mode": "NULLABLE"},
{"name": "sex_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "sex", "type": "STRING", "mode": "NULLABLE"},
{"name": "age_id", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "age_label", "type": "STRING", "mode": "NULLABLE"},
{"name": "age_start", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "age_end", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "age_mid", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "value", "type": "NUMERIC", "mode": "NULLABLE"},
{"name": "last_updated", "type": "DATE", "mode": "NULLABLE"},
],
skip_leading_rows=1,
source_format=bigquery.SourceFormat.CSV,
),
)
load_csv_to_gcp_job.result()