in lib/glue_scripts/etl_conformed_to_purposebuilt.py [0:0]
def main():
prefix = args['txn_sql_prefix_path']
key = prefix[1:] + args['table_name'] + '.sql'
print(key)
try:
s3 = boto3.client('s3')
response = s3.get_object(
Bucket=args['txn_bucket_name'],
Key=key
)
except botocore.exceptions.ClientError as error:
print('[ERROR] Glue job client process failed:{}'.format(error))
raise error
except Exception as e:
print('[ERROR] Glue job function call failed:{}'.format(e))
raise e
df = spark.sql(response['Body'].read().decode('utf-8'))
target_s3_location = "s3://" + args['target_bucketname'] + "/datalake_blog/"
storage_location = target_s3_location + args['table_name']
upsert_catalog_table(df, args['target_database_name'], args['table_name'], 'PARQUET', storage_location)
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')
spark.conf.set('hive.exec.dynamic.partition', 'true')
spark.conf.set('hive.exec.dynamic.partition.mode', 'nonstrict')
df.write.partitionBy('year', 'month', 'day').format('parquet').save(storage_location, mode='overwrite')
target_table_name = args['target_database_name'] + '.' + args['table_name']
spark.sql(f'ALTER TABLE {target_table_name} RECOVER PARTITIONS')