def run_spark_job()

in processing/batch_ingest_sm_pyspark.py [0:0]


def run_spark_job():
    args = parse_args()
    spark_session = SparkSession.builder.appName('PySparkJob').getOrCreate()
    spark_context = spark_session.sparkContext
    total_cores = int(spark_context._conf.get('spark.executor.instances')) * int(spark_context._conf.get('spark.executor.cores'))
    logger.info(f'Total available cores in the Spark cluster = {total_cores}')
    logger.info('Reading input file from S3')
    df = spark_session.read.options(Header=True).csv(args.s3_uri_prefix)
    
    # transform raw features
    # Wo do nothing in our case
    
    logger.info(f'Number of partitions = {df.rdd.getNumPartitions()}')
    # Rule of thumb heuristic - rely on the product of #executors by #executor.cores, and then multiply that by 3 or 4
    df = df.repartition(total_cores * 3)
    logger.info(f'Number of partitions after re-partitioning = {df.rdd.getNumPartitions()}')
    logger.info(f'Feature Store ingestion start: {datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}')
    df.foreachPartition(lambda rows: ingest_to_feature_store(args, rows))
    logger.info(f'Feature Store ingestion complete: {datetime.now().strftime("%m/%d/%Y, %H:%M:%S")}')