def fnMain()

in dataplex-quickstart-labs/00-resources/scripts/pyspark/chicago-crimes-analytics/crimes_report.py [0:0]


def fnMain(logger, args):
# {{ Start main

    logger.info('....Inside main')

    # 1. Capture Spark application input
    projectNbr=args.projectNbr
    projectID=args.projectID
    reportDirGcsURI=args.reportDirGcsURI
    reportName=args.reportName
    reportSQL=args.reportSQL
    reportPartitionCount=args.reportPartitionCount
    reportTableFQN=args.reportTableFQN
    reportTableDDL=args.reportTableDDL

    print("Arguments")
    print("........................")
    print(f"projectNbr: {projectNbr}")
    print(f"projectID: {projectID}")
    print(f"reportDirGcsURI: {reportDirGcsURI}")
    print(f"reportName: {reportName}")
    print(f"reportSQL: {reportSQL}")
    print(f"reportPartitionCount: {reportPartitionCount}")
    print(f"reportTableFQN: {reportTableFQN}")


    # 2. Create Spark session
    logger.info('....Initializing spark & spark configs')
    spark = SparkSession.builder.appName(f"reportName: {reportName}").getOrCreate()
    logger.info('....===================================')

    try:

        # 3. Drop table if exists
        #logger.info('....drop table if exists')
        #spark.sql(f"DROP TABLE IF EXISTS {reportTableFQN}").show(truncate=False)
        #logger.info('....===================================')

        # 4. Create dataframe off of the SQL & drop duplicates
        logger.info('....creating dataframe')
        reportDF = spark.sql(reportSQL)
        reportDF.dropDuplicates()
        logger.info('....===================================')
    
        # 5. Persist to the data lake as a table in the curated zone
        logger.info('....persisting dataframe to table')
        reportDF.repartition(int(reportPartitionCount)).write.parquet(reportDirGcsURI, mode='overwrite')
        logger.info('....completed persisting dataframe to table')
        logger.info('....===================================')

        # 6. Create external table
        logger.info('....Create table')
        print(f"Create Curated Crimes DDL: {reportTableDDL}")
        spark.sql(reportTableDDL).show(truncate=False)
        logger.info('....===================================')

        # 7. Refresh table 
        logger.info('....Refresh table')
        spark.sql(f"REFRESH TABLE {reportTableFQN};").show(truncate=False)
        logger.info('....===================================')

        # 8. Remove _SUCCESS file
        logger.info('....Deleting _SUCCESS')
        fnDeleteSuccessFlagFile(reportDirGcsURI)
        logger.info('....===================================')

    
    except RuntimeError as coreError:
            logger.error(coreError)
    else:
        logger.info(f"Successfully completed generating {reportName}!")