def upsert_catalog_table()

in lib/glue_scripts/etl_conformed_to_purposebuilt.py [0:0]


def upsert_catalog_table(df, target_database, table_name, classification, storage_location):
    """
    Function to upsert catalog table
    @param df:
    @param target_database:
    @param table_name:
    @param classification:
    @param storage_location:
    """
    create_database()
    df_schema = df.dtypes
    schema = []

    for s in df_schema:
        if (s[0] != 'year' and s[0] != 'month' and s[0] != 'day'):
            if s[1] == "decimal(10,0)":
                v = {"Name": s[0], "Type": "int"}
            elif s[1] == "null":
                v = {"Name": s[0], "Type": "string"}
            else:
                v = {"Name": s[0], "Type": s[1]}
            schema.append(v)

    print(schema)
    input_format = 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    output_format = 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    serde_info = {
        'Parameters': {
            'serialization.format': '1'
        },
        'SerializationLibrary': 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    }
    storage_descriptor = {
        'Columns': schema,
        'InputFormat': input_format,
        'OutputFormat': output_format,
        'SerdeInfo': serde_info,
        'Location': storage_location
    }

    partition_key = [
        {'Name': 'year', 'Type': 'string'},
        {'Name': 'month', 'Type': 'string'},
        {'Name': 'day', 'Type': 'string'}
    ]
    table_input = {
        'Name': table_name,
        'StorageDescriptor': storage_descriptor,
        'Parameters': {
            'classification': classification,
            'SourceType': 's3',
            'SourceTableName': table_name,
            'TableVersion': '0'
        },
        'TableType': 'EXTERNAL_TABLE',
        'PartitionKeys': partition_key
    }

    try:
        glue_client = boto3.client('glue')
        if not table_exists(target_database, table_name):
            print('[INFO] Target Table name: {} does not exist.'.format(table_name))
            glue_client.create_table(DatabaseName=target_database, TableInput=table_input)
        else:
            print('[INFO] Trying to update: TargetTable: {}'.format(table_name))
            glue_client.update_table(DatabaseName=target_database, TableInput=table_input)
    except botocore.exceptions.ClientError as error:
        print('[ERROR] Glue job client process failed:{}'.format(error))
        raise error
    except Exception as e:
        print('[ERROR] Glue job function call failed:{}'.format(e))
        raise e