fraud-detection-python/cloud-functions/geocode-addresses/main.py (57 lines of code) (raw):
# mypy: disable-error-code="1"
"""
Sends address data from Invoices to Geocode API and writes to BigQuery
"""
import base64
import json
import os
from urllib.parse import urlencode
from google.api_core.exceptions import GoogleAPICallError
from google.cloud import bigquery
import requests
DATASET_NAME = "invoice_parser_results"
TABLE_NAME = "geocode_details"
bq_client = bigquery.Client()
def write_to_bq(dataset_name, table_name, geocode_response_dict):
"""
Write Geocode Data to BigQuery
"""
dataset_ref = bq_client.dataset(dataset_name)
table_ref = dataset_ref.table(table_name)
row_to_insert = []
row_to_insert.append(geocode_response_dict)
json_data = json.dumps(row_to_insert, sort_keys=False)
json_object = json.loads(json_data)
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)
try:
job = bq_client.load_table_from_json(
json_object, table_ref, job_config=job_config
)
result = job.result() # Waits for table load to complete.
print(result)
except GoogleAPICallError as ex:
print(ex)
# pylint: disable=unused-argument
def process_address(event, context):
"""Triggered from a message on a Cloud Pub/Sub topic.
Args:
event (dict): Event payload.
context (google.cloud.functions.Context): Metadata for the event.
"""
pubsub_message = base64.b64decode(event["data"]).decode("utf-8")
message_dict = json.loads(pubsub_message)
query_address = message_dict.get("entity_text")
geocode_dict = {}
geocode_dict["input_file_name"] = message_dict.get("input_file_name")
geocode_dict["entity_type"] = message_dict.get("entity_type")
geocode_dict["entity_text"] = query_address
geocode_response_dict = extract_geocode_info(query_address)
geocode_dict.update(geocode_response_dict)
print(geocode_dict)
write_to_bq(DATASET_NAME, TABLE_NAME, geocode_dict)
def extract_geocode_info(query_address, data_type="json") -> dict:
"""
Extract Geocode Data from Google Maps API
"""
geocode_response_dict = {}
endpoint = f"https://maps.googleapis.com/maps/api/geocode/{data_type}"
api_key = os.environ.get("API_key")
params = {"address": query_address, "key": api_key}
url_params = urlencode(params)
try:
request = requests.get(endpoint, params=url_params)
results = request.json()["results"][0]
location = results["geometry"]["location"]
geocode_response_dict["place_id"] = results["place_id"]
geocode_response_dict["formatted_address"] = results["formatted_address"]
geocode_response_dict["lat"] = str(location.get("lat"))
geocode_response_dict["lng"] = str(location.get("lng"))
except requests.exceptions.RequestException as ex:
print(ex)
return geocode_response_dict