def fnMain()

in dataplex-quickstart-labs/00-resources/scripts/pyspark/nyc-taxi-trip-analytics/taxi_trips_data_generator.py [0:0]


def fnMain(logger, args):
# {{ Start main

    # 1. Capture Spark application input
    projectID = args.projectID
    tableFQN = args.tableFQN
    peristencePath = args.peristencePath

    # 2. Create Spark session
    logger.info('....Initializing spark & spark configs')
    spark = SparkSession.builder.appName("NYC Taxi trip dataset generator").getOrCreate()
    logger.info('....===================================')

    # 3. Read base data in BigQuery
    logger.info('....Creating a base DF off of a BigQuery table')
    baseDF = spark.read \
    .format('bigquery') \
    .load(tableFQN)
    logger.info('....===================================')
   
    try:
        # 4. Persist to Cloud Storage
        
        logger.info('....Persisting dataframe in overwrite mode')
        baseDF.coalesce(2).write.partitionBy("trip_year","trip_month","trip_day").parquet(peristencePath, mode='overwrite')
        logger.info('....===================================')
        
        # 5. Delete flag files
        logger.info('....Deleting _SUCCESS')
        fnDeleteSuccessFlagFile(peristencePath)
        logger.info('....===================================')
    
       
    except RuntimeError as coreError:
            logger.error(coreError)
    else:
        logger.info('Successfully completed persisting NYC taxi trips!')