app/python/process.py (78 lines of code) (raw):

#!/usr/bin/python # # Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ process.py - take data in raw_data bucket, process, and store in processed bucket. """ import csv import json import tempfile from config import ( FACETS, PROCESSED_DATA_BUCKET, RAW_DATA_BUCKET, RAW_DATA_FILE, SEGMENTS, logging, version, ) import google.cloud.storage def download_raw_data(): """ Download raw data from Cloud Storage into local file for processing """ logging.info(" download_raw_data: start downloading data") if RAW_DATA_BUCKET is None: raise ValueError("RAW_DATA_BUCKET required") if PROCESSED_DATA_BUCKET is None: raise ValueError("PROCESSED_DATA_BUCKET required") temp_datafile = f"{tempfile.mkdtemp()}/raw_data.csv" logging.info(" download_raw_data: processing from " f"{RAW_DATA_BUCKET}") storage_client = google.cloud.storage.Client() raw_bucket = storage_client.get_bucket(RAW_DATA_BUCKET) raw_bucket.blob(RAW_DATA_FILE).download_to_filename(temp_datafile) logging.info(f" download_raw_data: downloaded data to {temp_datafile}") return temp_datafile def process_raw_data(temp_datafile): """ Process local file, producing aggregate data """ logging.info(" process_raw_data: start processing data") aggregate = {} ignored_records = 0 counted_records = 0 with open(temp_datafile) as f: csv_data = csv.DictReader(f) data = [row for row in csv_data] logging.info(f"Processing {len(data)} records from {RAW_DATA_FILE}") # Process each row. for row in data: # Ignore any records with incomplete data process = True for facet in FACETS: if row[facet] == "" or row[facet] == "?": ignored_records += 1 process = False if process: # Build aggregate identifier row_key = "/".join([row[f] for f in FACETS]) # Build the base data structure on first interaction if row_key not in aggregate.keys(): aggregate[row_key] = {"_counter": 0} for segment in SEGMENTS: if segment not in aggregate[row_key].keys(): aggregate[row_key][segment] = 0 # Record the relevant data for segment in SEGMENTS: if row[segment] == "true": aggregate[row_key][segment] += 1 # Increment counters aggregate[row_key]["_counter"] += 1 counted_records += 1 logging.info( f" process_raw_data: processed {counted_records} records," f" removed {ignored_records}." ) return aggregate def write_processed_data(aggregate): """ Write aggregate data to Cloud Storage """ logging.info(" write_processed_data: start writing data.") counter = 0 storage_client = google.cloud.storage.Client() processed_bucket = storage_client.get_bucket(PROCESSED_DATA_BUCKET) for rowkey in aggregate.keys(): data_file = f"{rowkey}/data.json" facet_data = json.dumps(aggregate[rowkey]) processed_bucket.blob(data_file).upload_from_string(facet_data) counter += 1 logging.info(f" write_processed_data: wrote {counter} files.") if __name__ == "__main__": logging.info( f"🟢 Start process.py (v{version}) with: " f"{RAW_DATA_BUCKET}, {PROCESSED_DATA_BUCKET}." ) datafile = download_raw_data() result = process_raw_data(datafile) write_processed_data(result) logging.info("🏁 process.py complete.")