in lib/glue_scripts/etl_raw_to_conformed.py [0:0]
def main():
source_path = 's3://' + args['source_bucketname'] + '/' + args['source_key'] + '/' + args['base_file_name']
print(source_path)
df = spark.read.format('csv') \
.option('header', 'true') \
.option('delimiter', ',') \
.option('inferSchema', 'true') \
.option('mode', 'DROPMALFORMED') \
.load(source_path)
target_s3_location = 's3://' + args['target_bucketname'] + '/datalake_blog/'
storage_location = target_s3_location + args['table_name']
upsert_catalog_table(df, args['target_databasename'], args['table_name'], 'PARQUET', storage_location)
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
spark.conf.set('hive.exec.dynamic.partition', 'true')
spark.conf.set('hive.exec.dynamic.partition.mode', 'nonstrict')
dynamic_df = DynamicFrame.fromDF(df, glueContext, 'table_df')
dynamic_df.show(5)
mapped_dyF = Map.apply(frame=dynamic_df, f=add_partition)
df_final = mapped_dyF.toDF()
df_final.show(5)
# get dataframe schema
my_schema = list(df_final.schema)
print(my_schema)
null_cols = []
# iterate over schema list to filter for NullType columns
for st in my_schema:
if str(st.dataType) == 'NullType':
null_cols.append(st)
# cast null type columns to string (or whatever you'd like)
for ncol in null_cols:
mycolname = str(ncol.name)
df_final = df_final.withColumn(mycolname, df_final[mycolname].cast('string'))
df_final.show(5)
df_final.write.partitionBy('year', 'month', 'day').format('parquet').save(storage_location, mode='overwrite')
target_table_name = args['target_databasename'] + '.' + args['table_name']
spark.sql(f'ALTER TABLE {target_table_name} RECOVER PARTITIONS')
job.commit()