def main()

in lib/glue_scripts/etl_raw_to_conformed.py [0:0]


def main():
    source_path = 's3://' + args['source_bucketname'] + '/' + args['source_key'] + '/' + args['base_file_name']
    print(source_path)

    df = spark.read.format('csv') \
        .option('header', 'true') \
        .option('delimiter', ',') \
        .option('inferSchema', 'true') \
        .option('mode', 'DROPMALFORMED') \
        .load(source_path)

    target_s3_location = 's3://' + args['target_bucketname'] + '/datalake_blog/'
    storage_location = target_s3_location + args['table_name']
    upsert_catalog_table(df, args['target_databasename'], args['table_name'], 'PARQUET', storage_location)

    spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
    spark.conf.set('hive.exec.dynamic.partition', 'true')
    spark.conf.set('hive.exec.dynamic.partition.mode', 'nonstrict')

    dynamic_df = DynamicFrame.fromDF(df, glueContext, 'table_df')
    dynamic_df.show(5)
    mapped_dyF = Map.apply(frame=dynamic_df, f=add_partition)
    df_final = mapped_dyF.toDF()
    df_final.show(5)
    # get dataframe schema
    my_schema = list(df_final.schema)
    print(my_schema)
    null_cols = []

    # iterate over schema list to filter for NullType columns
    for st in my_schema:
        if str(st.dataType) == 'NullType':
            null_cols.append(st)

    # cast null type columns to string (or whatever you'd like)
    for ncol in null_cols:
        mycolname = str(ncol.name)
        df_final = df_final.withColumn(mycolname, df_final[mycolname].cast('string'))

    df_final.show(5)
    df_final.write.partitionBy('year', 'month', 'day').format('parquet').save(storage_location, mode='overwrite')

    target_table_name = args['target_databasename'] + '.' + args['table_name']
    spark.sql(f'ALTER TABLE {target_table_name} RECOVER PARTITIONS')

    job.commit()