def main()

in lib/glue_scripts/etl_conformed_to_purposebuilt.py [0:0]


def main():
    prefix = args['txn_sql_prefix_path']
    key = prefix[1:] + args['table_name'] + '.sql'

    print(key)

    try:
        s3 = boto3.client('s3')
        response = s3.get_object(
            Bucket=args['txn_bucket_name'],
            Key=key
        )
    except botocore.exceptions.ClientError as error:
        print('[ERROR] Glue job client process failed:{}'.format(error))
        raise error
    except Exception as e:
        print('[ERROR] Glue job function call failed:{}'.format(e))
        raise e

    df = spark.sql(response['Body'].read().decode('utf-8'))

    target_s3_location = "s3://" + args['target_bucketname'] + "/datalake_blog/"
    storage_location = target_s3_location + args['table_name']
    upsert_catalog_table(df, args['target_database_name'], args['table_name'], 'PARQUET', storage_location)

    spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
    spark.conf.set('hive.exec.dynamic.partition', 'true')
    spark.conf.set('hive.exec.dynamic.partition.mode', 'nonstrict')

    df.write.partitionBy('year', 'month', 'day').format('parquet').save(storage_location, mode='overwrite')

    target_table_name = args['target_database_name'] + '.' + args['table_name']
    spark.sql(f'ALTER TABLE {target_table_name} RECOVER PARTITIONS')