dataplex-quickstart-labs/00-resources/scripts/pyspark/chicago-crimes-analytics/curate_crimes.py (91 lines of code) (raw):

# ............................................................ # Curate Chicago Crimes # ............................................................ # This script - # 1. subsets raw Chicago crimes data # 2. augments with temporal attributes # 3. persists to GCS in the curated zone as parquet # 4. creates an external table in HMS over #2 # ............................................................ import sys,logging,argparse import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import * from datetime import datetime from google.cloud import storage def fnParseArguments(): # {{ Start """ Purpose: Parse arguments received by script Returns: args """ argsParser = argparse.ArgumentParser() argsParser.add_argument( '--projectID', help='Project ID', type=str, required=True) argsParser.add_argument( '--tableFQN', help='Table fully qualified name', type=str, required=True) argsParser.add_argument( '--peristencePath', help='GCS location', type=str, required=True) return argsParser.parse_args() # }} End fnParseArguments() def fnDeleteSuccessFlagFile(bucket_uri): # {{ Start """Deletes a blob from the bucket.""" # bucket_name = "your-bucket-name" # blob_name = "your-object-name" storage_client = storage.Client() bucket_name = bucket_uri.split("/")[2] object_name = "/".join(bucket_uri.split("/")[3:]) print(f"Bucket name: {bucket_name}") print(f"Object name: {object_name}") bucket = storage_client.bucket(bucket_name) blob = bucket.blob(f"{object_name}_SUCCESS") blob.delete() print(f"_SUCCESS file deleted.") # }} End def fnMain(logger, args): # {{ Start main # 1. Capture Spark application input projectID = args.projectID tableFQN = args.tableFQN peristencePath = args.peristencePath # 2. Create Spark session logger.info('....Initializing spark & spark configs') spark = SparkSession.builder.appName("Curate Chicago Crimes").getOrCreate() logger.info('....===================================') # 3. Create curated crimes SQL # 3.1. Read data from BigQuery logger.info('....Creating a base DF off of a BigQuery table') baseDF = spark.read \ .format('bigquery') \ .load(f'{projectID}.oda_raw_zone.crimes_raw') logger.info('....===================================') # 3.2. Register temp table logger.info('....Creating a temp table') baseDF.createOrReplaceTempView("crimes_raw") baseDF.count() logger.info('....===================================') # 3.3. Then create the curate crimes SQL curatedCrimesSQL="SELECT case_number,primary_type as case_type,date as case_date,year AS case_year,date_format(date, 'MMM') AS case_month,date_format(date,'E') AS case_day_of_week, hour(date) AS case_hour_of_day FROM crimes_raw;" print(f"Curated Crimes SQL: {curatedCrimesSQL}") logger.info('....===================================') try: # 4. Drop table if exists #logger.info('....Dropping table if it exists') #spark.sql(f"DROP TABLE IF EXISTS {tableFQN}").show(truncate=False) #logger.info('....===================================') # 5. Curate crimes logger.info('....Creating dataframe') curatedCrimesDF = spark.sql(curatedCrimesSQL) curatedCrimesDF.dropDuplicates() curatedCrimesDF.count() logger.info('....===================================') # 6. Persist to the data lake bucket in the curated zone logger.info('....Persisting dataframe in overwrite mode') print(f"peristencePath is {peristencePath}") curatedCrimesDF.repartition(17).write.parquet(peristencePath, mode='overwrite') logger.info('....===================================') # 7. Create table definition logger.info('....Create table') CREATE_TABLE_DDL=f"CREATE TABLE IF NOT EXISTS {tableFQN}(case_number string, case_type string,case_date timestamp, case_year long, case_month string, case_day_of_week string, case_hour_of_day integer) STORED AS PARQUET LOCATION \"{peristencePath}\";" print(f"Create Curated Crimes DDL: {CREATE_TABLE_DDL}") spark.sql(CREATE_TABLE_DDL).show(truncate=False) logger.info('....===================================') # 8. Refresh table logger.info('....Refresh table') spark.sql(f"REFRESH TABLE {tableFQN};").show(truncate=False) logger.info('....===================================') # 9. Remove _SUCCESS file logger.info('....Deleting _SUCCESS') fnDeleteSuccessFlagFile(peristencePath) logger.info('....===================================') except RuntimeError as coreError: logger.error(coreError) else: logger.info('Successfully completed curating Chicago crimes!') # }} End fnMain() def fnConfigureLogger(): # {{ Start """ Purpose: Configure a logger for the script Returns: Logger object """ logFormatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s') logger = logging.getLogger("data_engineering") logger.setLevel(logging.INFO) logger.propagate = False logStreamHandler = logging.StreamHandler(sys.stdout) logStreamHandler.setFormatter(logFormatter) logger.addHandler(logStreamHandler) return logger # }} End fnConfigureLogger() if __name__ == "__main__": arguments = fnParseArguments() logger = fnConfigureLogger() fnMain(logger, arguments)