dataplex-quickstart-labs/00-resources/scripts/pyspark/chicago-crimes-analytics/crimes_report.py (110 lines of code) (raw):

# ............................................................ # Create Crime Trend Report # ............................................................ import sys,logging,argparse import pyspark from pyspark.sql import SparkSession from pyspark.sql.functions import * from datetime import datetime from google.cloud import storage def fnParseArguments(): # {{ Start """ Purpose: Parse arguments received by script Returns: args """ argsParser = argparse.ArgumentParser() argsParser.add_argument( '--projectNbr', help='The project number', required=True) argsParser.add_argument( '--projectID', help='The project id', type=str, required=True) argsParser.add_argument( '--reportDirGcsURI', help='The GCS URI for the report', required=True) argsParser.add_argument( '--reportName', help='The report name', required=True) argsParser.add_argument( '--reportSQL', help='The report SQL', required=True) argsParser.add_argument( '--reportPartitionCount', help='The spark partition count', required=True) argsParser.add_argument( '--reportTableFQN', help='The report table FQN', required=True) argsParser.add_argument( '--reportTableDDL', help='The report table DDL', required=True) return argsParser.parse_args() # }} End fnParseArguments() def fnMain(logger, args): # {{ Start main logger.info('....Inside main') # 1. Capture Spark application input projectNbr=args.projectNbr projectID=args.projectID reportDirGcsURI=args.reportDirGcsURI reportName=args.reportName reportSQL=args.reportSQL reportPartitionCount=args.reportPartitionCount reportTableFQN=args.reportTableFQN reportTableDDL=args.reportTableDDL print("Arguments") print("........................") print(f"projectNbr: {projectNbr}") print(f"projectID: {projectID}") print(f"reportDirGcsURI: {reportDirGcsURI}") print(f"reportName: {reportName}") print(f"reportSQL: {reportSQL}") print(f"reportPartitionCount: {reportPartitionCount}") print(f"reportTableFQN: {reportTableFQN}") # 2. Create Spark session logger.info('....Initializing spark & spark configs') spark = SparkSession.builder.appName(f"reportName: {reportName}").getOrCreate() logger.info('....===================================') try: # 3. Drop table if exists #logger.info('....drop table if exists') #spark.sql(f"DROP TABLE IF EXISTS {reportTableFQN}").show(truncate=False) #logger.info('....===================================') # 4. Create dataframe off of the SQL & drop duplicates logger.info('....creating dataframe') reportDF = spark.sql(reportSQL) reportDF.dropDuplicates() logger.info('....===================================') # 5. Persist to the data lake as a table in the curated zone logger.info('....persisting dataframe to table') reportDF.repartition(int(reportPartitionCount)).write.parquet(reportDirGcsURI, mode='overwrite') logger.info('....completed persisting dataframe to table') logger.info('....===================================') # 6. Create external table logger.info('....Create table') print(f"Create Curated Crimes DDL: {reportTableDDL}") spark.sql(reportTableDDL).show(truncate=False) logger.info('....===================================') # 7. Refresh table logger.info('....Refresh table') spark.sql(f"REFRESH TABLE {reportTableFQN};").show(truncate=False) logger.info('....===================================') # 8. Remove _SUCCESS file logger.info('....Deleting _SUCCESS') fnDeleteSuccessFlagFile(reportDirGcsURI) logger.info('....===================================') except RuntimeError as coreError: logger.error(coreError) else: logger.info(f"Successfully completed generating {reportName}!") # }} End fnMain() def fnDeleteSuccessFlagFile(bucket_uri): # {{ Start """Deletes a blob from the bucket.""" # bucket_name = "your-bucket-name" # blob_name = "your-object-name" storage_client = storage.Client() bucket_name = bucket_uri.split("/")[2] object_name = "/".join(bucket_uri.split("/")[3:]) print(f"Bucket name: {bucket_name}") print(f"Object name: {object_name}") bucket = storage_client.bucket(bucket_name) blob = bucket.blob(f"{object_name}/_SUCCESS") blob.delete() print(f"_SUCCESS file deleted.") # }} End def fnConfigureLogger(): # {{ Start """ Purpose: Configure a logger for the script Returns: Logger object """ logFormatter = logging.Formatter('%(asctime)s - %(filename)s - %(levelname)s - %(message)s') logger = logging.getLogger("data_engineering") logger.setLevel(logging.INFO) logger.propagate = False logStreamHandler = logging.StreamHandler(sys.stdout) logStreamHandler.setFormatter(logFormatter) logger.addHandler(logStreamHandler) return logger # }} End fnConfigureLogger() if __name__ == "__main__": arguments = fnParseArguments() logger = fnConfigureLogger() fnMain(logger, arguments)