def upsert_catalog_table()

in lib/glue_scripts/etl_raw_to_conformed.py [0:0]


def upsert_catalog_table(df, target_database, table_name, classification, storage_location):
    """
    Function to upsert catalog table
    @param df:
    @param target_database:
    @param table_name:
    @param classification:
    @param storage_location:
    """
    create_database()
    df_schema = df.dtypes
    schema = []
    for s in df_schema:
        if s[1] == 'decimal(10,0)':
            print('converting decimal(10,0) to int')
            v = {'Name': s[0], 'Type': 'int'}
        elif s[1] == 'null':
            v = {'Name': s[0], 'Type': 'string'}
        else:
            v = {'Name': s[0], 'Type': s[1]}
        schema.append(v)

    print(schema)
    input_format = 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    output_format = 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    serde_info = {
        'Parameters': {
            'serialization.format': '1'
        },
        'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    }
    storage_descriptor = {
        'Columns': schema,
        'InputFormat': input_format,
        'OutputFormat': output_format,
        'SerdeInfo': serde_info,
        'Location': storage_location
    }

    partition_key = [
        {'Name': 'year', 'Type': 'string'},
        {'Name': 'month', 'Type': 'string'},
        {'Name': 'day', 'Type': 'string'},
    ]
    table_input = {
        'Name': table_name,
        'StorageDescriptor': storage_descriptor,
        'Parameters': {
            'classification': classification,
            'SourceType': 's3',
            'SourceTableName': table_name,
            'TableVersion': '0'
        },
        'TableType': 'EXTERNAL_TABLE',
        'PartitionKeys': partition_key
    }

    try:
        glue_client = boto3.client('glue')
        if not table_exists(target_database, table_name):
            print('[INFO] Target Table name: {} does not exist.'.format(table_name))
            glue_client.create_table(DatabaseName=target_database, TableInput=table_input)
        else:
            print('[INFO] Trying to update: TargetTable: {}'.format(table_name))
            glue_client.update_table(DatabaseName=target_database, TableInput=table_input)
    except botocore.exceptions.ClientError as error:
        print('[ERROR] Glue job client process failed:{}'.format(error))
        raise error
    except Exception as e:
        print('[ERROR] Glue job function call failed:{}'.format(e))
        raise e