in parquet_flask/io_logic/ingest_new_file.py [0:0]
def create_df(spark_session, data_list, job_id, provider, project):
LOGGER.debug(f'creating data frame with length {len(data_list)}')
df = spark_session.createDataFrame(data_list)
# spark_session.sparkContext.addPyFile('/usr/app/parquet_flask/lat_lon_udf.py')
LOGGER.debug(f'adding columns')
df: DataFrame = df.withColumn(CDMSConstants.time_obj_col, to_timestamp(CDMSConstants.time_col))\
.withColumn(CDMSConstants.year_col, year(CDMSConstants.time_col))\
.withColumn(CDMSConstants.month_col, month(CDMSConstants.time_col))\
.withColumn(CDMSConstants.platform_code_col, df[CDMSConstants.platform_col][CDMSConstants.code_col])\
.withColumn(CDMSConstants.job_id_col, lit(job_id))\
.withColumn(CDMSConstants.provider_col, lit(provider))\
.withColumn(CDMSConstants.project_col, lit(project))
geospatial_interval_dict = get_geospatial_interval(project)
try:
df: DataFrame = df.withColumn(
CDMSConstants.geo_spatial_interval_col,
pyspark_functions.udf(
lambda platform_code, latitude, longitude: f'{int(latitude - divmod(latitude, int(geospatial_interval_dict.get(platform_code, GEOSPATIAL_INTERVAL)))[1])}_{int(longitude - divmod(longitude, int(geospatial_interval_dict.get(platform_code, GEOSPATIAL_INTERVAL)))[1])}',
StringType())(
df[CDMSConstants.platform_code_col],
df[CDMSConstants.lat_col],
df[CDMSConstants.lon_col]))
df: DataFrame = df.repartition(1) # combine to 1 data frame to increase size
# .withColumn('ingested_date', lit(TimeUtils.get_current_time_str()))
LOGGER.debug(f'create writer')
all_partitions = [
CDMSConstants.provider_col,
CDMSConstants.project_col,
CDMSConstants.platform_code_col,
CDMSConstants.geo_spatial_interval_col,
CDMSConstants.year_col,
CDMSConstants.month_col,
CDMSConstants.job_id_col
]
# df = df.repartition(1) # XXX: is this line repeated?
df_writer = df.write
LOGGER.debug(f'create partitions')
df_writer = df_writer.partitionBy(all_partitions)
LOGGER.debug(f'created partitions')
except BaseException as e:
LOGGER.exception(f'unexpected exception. latitude: {df[CDMSConstants.lat_col]}. longitude: {df[CDMSConstants.lon_col]}')
raise e
return df_writer