in dataplex-quickstart-labs/00-resources/scripts/pyspark/chicago-crimes-analytics/crimes_report.py [0:0]
def fnMain(logger, args):
# {{ Start main
logger.info('....Inside main')
# 1. Capture Spark application input
projectNbr=args.projectNbr
projectID=args.projectID
reportDirGcsURI=args.reportDirGcsURI
reportName=args.reportName
reportSQL=args.reportSQL
reportPartitionCount=args.reportPartitionCount
reportTableFQN=args.reportTableFQN
reportTableDDL=args.reportTableDDL
print("Arguments")
print("........................")
print(f"projectNbr: {projectNbr}")
print(f"projectID: {projectID}")
print(f"reportDirGcsURI: {reportDirGcsURI}")
print(f"reportName: {reportName}")
print(f"reportSQL: {reportSQL}")
print(f"reportPartitionCount: {reportPartitionCount}")
print(f"reportTableFQN: {reportTableFQN}")
# 2. Create Spark session
logger.info('....Initializing spark & spark configs')
spark = SparkSession.builder.appName(f"reportName: {reportName}").getOrCreate()
logger.info('....===================================')
try:
# 3. Drop table if exists
#logger.info('....drop table if exists')
#spark.sql(f"DROP TABLE IF EXISTS {reportTableFQN}").show(truncate=False)
#logger.info('....===================================')
# 4. Create dataframe off of the SQL & drop duplicates
logger.info('....creating dataframe')
reportDF = spark.sql(reportSQL)
reportDF.dropDuplicates()
logger.info('....===================================')
# 5. Persist to the data lake as a table in the curated zone
logger.info('....persisting dataframe to table')
reportDF.repartition(int(reportPartitionCount)).write.parquet(reportDirGcsURI, mode='overwrite')
logger.info('....completed persisting dataframe to table')
logger.info('....===================================')
# 6. Create external table
logger.info('....Create table')
print(f"Create Curated Crimes DDL: {reportTableDDL}")
spark.sql(reportTableDDL).show(truncate=False)
logger.info('....===================================')
# 7. Refresh table
logger.info('....Refresh table')
spark.sql(f"REFRESH TABLE {reportTableFQN};").show(truncate=False)
logger.info('....===================================')
# 8. Remove _SUCCESS file
logger.info('....Deleting _SUCCESS')
fnDeleteSuccessFlagFile(reportDirGcsURI)
logger.info('....===================================')
except RuntimeError as coreError:
logger.error(coreError)
else:
logger.info(f"Successfully completed generating {reportName}!")