in dataplex-quickstart-labs/00-resources/scripts/pyspark/chicago-crimes-analytics/curate_crimes.py [0:0]
def fnMain(logger, args):
# {{ Start main
# 1. Capture Spark application input
projectID = args.projectID
tableFQN = args.tableFQN
peristencePath = args.peristencePath
# 2. Create Spark session
logger.info('....Initializing spark & spark configs')
spark = SparkSession.builder.appName("Curate Chicago Crimes").getOrCreate()
logger.info('....===================================')
# 3. Create curated crimes SQL
# 3.1. Read data from BigQuery
logger.info('....Creating a base DF off of a BigQuery table')
baseDF = spark.read \
.format('bigquery') \
.load(f'{projectID}.oda_raw_zone.crimes_raw')
logger.info('....===================================')
# 3.2. Register temp table
logger.info('....Creating a temp table')
baseDF.createOrReplaceTempView("crimes_raw")
baseDF.count()
logger.info('....===================================')
# 3.3. Then create the curate crimes SQL
curatedCrimesSQL="SELECT case_number,primary_type as case_type,date as case_date,year AS case_year,date_format(date, 'MMM') AS case_month,date_format(date,'E') AS case_day_of_week, hour(date) AS case_hour_of_day FROM crimes_raw;"
print(f"Curated Crimes SQL: {curatedCrimesSQL}")
logger.info('....===================================')
try:
# 4. Drop table if exists
#logger.info('....Dropping table if it exists')
#spark.sql(f"DROP TABLE IF EXISTS {tableFQN}").show(truncate=False)
#logger.info('....===================================')
# 5. Curate crimes
logger.info('....Creating dataframe')
curatedCrimesDF = spark.sql(curatedCrimesSQL)
curatedCrimesDF.dropDuplicates()
curatedCrimesDF.count()
logger.info('....===================================')
# 6. Persist to the data lake bucket in the curated zone
logger.info('....Persisting dataframe in overwrite mode')
print(f"peristencePath is {peristencePath}")
curatedCrimesDF.repartition(17).write.parquet(peristencePath, mode='overwrite')
logger.info('....===================================')
# 7. Create table definition
logger.info('....Create table')
CREATE_TABLE_DDL=f"CREATE TABLE IF NOT EXISTS {tableFQN}(case_number string, case_type string,case_date timestamp, case_year long, case_month string, case_day_of_week string, case_hour_of_day integer) STORED AS PARQUET LOCATION \"{peristencePath}\";"
print(f"Create Curated Crimes DDL: {CREATE_TABLE_DDL}")
spark.sql(CREATE_TABLE_DDL).show(truncate=False)
logger.info('....===================================')
# 8. Refresh table
logger.info('....Refresh table')
spark.sql(f"REFRESH TABLE {tableFQN};").show(truncate=False)
logger.info('....===================================')
# 9. Remove _SUCCESS file
logger.info('....Deleting _SUCCESS')
fnDeleteSuccessFlagFile(peristencePath)
logger.info('....===================================')
except RuntimeError as coreError:
logger.error(coreError)
else:
logger.info('Successfully completed curating Chicago crimes!')