in lib/glue_scripts/etl_conformed_to_purposebuilt.py [0:0]
def upsert_catalog_table(df, target_database, table_name, classification, storage_location):
"""
Function to upsert catalog table
@param df:
@param target_database:
@param table_name:
@param classification:
@param storage_location:
"""
create_database()
df_schema = df.dtypes
schema = []
for s in df_schema:
if (s[0] != 'year' and s[0] != 'month' and s[0] != 'day'):
if s[1] == "decimal(10,0)":
v = {"Name": s[0], "Type": "int"}
elif s[1] == "null":
v = {"Name": s[0], "Type": "string"}
else:
v = {"Name": s[0], "Type": s[1]}
schema.append(v)
print(schema)
input_format = 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
output_format = 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
serde_info = {
'Parameters': {
'serialization.format': '1'
},
'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
}
storage_descriptor = {
'Columns': schema,
'InputFormat': input_format,
'OutputFormat': output_format,
'SerdeInfo': serde_info,
'Location': storage_location
}
partition_key = [
{'Name': 'year', 'Type': 'string'},
{'Name': 'month', 'Type': 'string'},
{'Name': 'day', 'Type': 'string'}
]
table_input = {
'Name': table_name,
'StorageDescriptor': storage_descriptor,
'Parameters': {
'classification': classification,
'SourceType': 's3',
'SourceTableName': table_name,
'TableVersion': '0'
},
'TableType': 'EXTERNAL_TABLE',
'PartitionKeys': partition_key
}
try:
glue_client = boto3.client('glue')
if not table_exists(target_database, table_name):
print('[INFO] Target Table name: {} does not exist.'.format(table_name))
glue_client.create_table(DatabaseName=target_database, TableInput=table_input)
else:
print('[INFO] Trying to update: TargetTable: {}'.format(table_name))
glue_client.update_table(DatabaseName=target_database, TableInput=table_input)
except botocore.exceptions.ClientError as error:
print('[ERROR] Glue job client process failed:{}'.format(error))
raise error
except Exception as e:
print('[ERROR] Glue job function call failed:{}'.format(e))
raise e