in dataplex-quickstart-labs/00-resources/scripts/pyspark/nyc-taxi-trip-analytics/taxi_trips_data_generator.py [0:0]
def fnMain(logger, args):
# {{ Start main
# 1. Capture Spark application input
projectID = args.projectID
tableFQN = args.tableFQN
peristencePath = args.peristencePath
# 2. Create Spark session
logger.info('....Initializing spark & spark configs')
spark = SparkSession.builder.appName("NYC Taxi trip dataset generator").getOrCreate()
logger.info('....===================================')
# 3. Read base data in BigQuery
logger.info('....Creating a base DF off of a BigQuery table')
baseDF = spark.read \
.format('bigquery') \
.load(tableFQN)
logger.info('....===================================')
try:
# 4. Persist to Cloud Storage
logger.info('....Persisting dataframe in overwrite mode')
baseDF.coalesce(2).write.partitionBy("trip_year","trip_month","trip_day").parquet(peristencePath, mode='overwrite')
logger.info('....===================================')
# 5. Delete flag files
logger.info('....Deleting _SUCCESS')
fnDeleteSuccessFlagFile(peristencePath)
logger.info('....===================================')
except RuntimeError as coreError:
logger.error(coreError)
else:
logger.info('Successfully completed persisting NYC taxi trips!')