in jobs/quicksuggest2bq/quicksuggest2bq/main.py [0:0]
def download_suggestions(client: kinto_http.Client) -> Iterator[KintoSuggestion]:
"""Get records, download attachments and return the suggestions."""
# Retrieve the base_url for attachments
server_info = client.server_info()
attachments_base_url = server_info["capabilities"]["attachments"]["base_url"]
# Load records for both "type: data" and "type: offline-expansion-data".
# See details in: https://mozilla-hub.atlassian.net/browse/CONSVC-1818
data_records = [
record
for record in client.get_records()
if record["type"] in ["data", "offline-expansion-data"]
]
# Make use of connection pooling because all requests go to the same host
requests_session = requests.Session()
for record in data_records:
attachment_url = f"{attachments_base_url}{record['attachment']['location']}"
response = requests_session.get(attachment_url)
if response.status_code != 200:
# Ignore unsuccessful requests for now
logging.error(
(
"Failed to download attachment for record with ID '%s'."
" Response status code %s."
),
record["id"],
response.status_code,
)
continue
# Each attachment is a list of suggestion objects and each suggestion
# object contains a list of keywords. Load the suggestions into pydantic
# model instances to discard all fields which we don't care about here.
for suggestion_data in response.json():
suggestion: Dict[str, Any] = {
**suggestion_data,
"full_keywords": [
{"keyword": kw, "count": count}
for kw, count in suggestion_data.get("full_keywords", [])
],
}
yield KintoSuggestion(**suggestion)