def create_df()

in parquet_flask/io_logic/ingest_new_file.py [0:0]


    def create_df(spark_session, data_list, job_id, provider, project):
        LOGGER.debug(f'creating data frame with length {len(data_list)}')
        df = spark_session.createDataFrame(data_list)
        # spark_session.sparkContext.addPyFile('/usr/app/parquet_flask/lat_lon_udf.py')
        LOGGER.debug(f'adding columns')
        df: DataFrame = df.withColumn(CDMSConstants.time_obj_col, to_timestamp(CDMSConstants.time_col))\
            .withColumn(CDMSConstants.year_col, year(CDMSConstants.time_col))\
            .withColumn(CDMSConstants.month_col, month(CDMSConstants.time_col))\
            .withColumn(CDMSConstants.platform_code_col, df[CDMSConstants.platform_col][CDMSConstants.code_col])\
            .withColumn(CDMSConstants.job_id_col, lit(job_id))\
            .withColumn(CDMSConstants.provider_col, lit(provider))\
            .withColumn(CDMSConstants.project_col, lit(project))
        geospatial_interval_dict = get_geospatial_interval(project)
        try:
            df: DataFrame = df.withColumn(
                CDMSConstants.geo_spatial_interval_col, 
                pyspark_functions.udf(
                    lambda platform_code, latitude, longitude: f'{int(latitude - divmod(latitude, int(geospatial_interval_dict.get(platform_code, GEOSPATIAL_INTERVAL)))[1])}_{int(longitude - divmod(longitude, int(geospatial_interval_dict.get(platform_code, GEOSPATIAL_INTERVAL)))[1])}',
                    StringType())(
                        df[CDMSConstants.platform_code_col],
                        df[CDMSConstants.lat_col],
                        df[CDMSConstants.lon_col]))
            df: DataFrame = df.repartition(1)  # combine to 1 data frame to increase size
            # .withColumn('ingested_date', lit(TimeUtils.get_current_time_str()))
            LOGGER.debug(f'create writer')
            all_partitions = [
                CDMSConstants.provider_col, 
                CDMSConstants.project_col, 
                CDMSConstants.platform_code_col,
                CDMSConstants.geo_spatial_interval_col,
                CDMSConstants.year_col,
                CDMSConstants.month_col,
                CDMSConstants.job_id_col
            ]
            # df = df.repartition(1)  # XXX: is this line repeated?
            df_writer = df.write
            LOGGER.debug(f'create partitions')
            df_writer = df_writer.partitionBy(all_partitions)
            LOGGER.debug(f'created partitions')
        except BaseException as e:
            LOGGER.exception(f'unexpected exception. latitude: {df[CDMSConstants.lat_col]}. longitude: {df[CDMSConstants.lon_col]}')
            raise e
        return df_writer