in 07-module-feature-monitoring/feature_monitoring_utils.py [0:0]
def delete_fg_snapshot_ctas(fg_name, verbose, table=""):
# Retreive FG's table name
fg = FeatureGroup(name=fg_name, sagemaker_session=feature_store_session)
table_name = fg.describe()['OfflineStoreConfig']['DataCatalogConfig']['TableName']
database = fg.describe()['OfflineStoreConfig']['DataCatalogConfig']['Database']
if table == "":
table = f'{table_name}{ctas_table_suffix}'
# Retrieve property from Athena table to get S3 location of underlying files
client = boto3.client(service_name='athena', region_name=region)
response = client.get_table_metadata(
CatalogName='AwsDataCatalog',
DatabaseName=database,
TableName=table
)
s3_location_ctas_table = response['TableMetadata']['Parameters']['location']
if verbose:
print(s3_location_ctas_table)
# Retrieve S3 location from the creation the Temp Snapshot table
s3_uri=fg.describe()['OfflineStoreConfig']['S3StorageConfig']['S3Uri']
s3_uri_athena_table = get_s3_uri_from_athena_table(database, table)
prefix = '/'.join(s3_uri_athena_table.split('/')[3:])
bucket = s3_uri_athena_table.split('/')[2]
# Drop the Temp Snapshot table
table = f'`{database}.{table}`'
query = f' DROP TABLE {table} '
athena_result_bucket = f'{s3_uri}/{account_id}/sagemaker/{region}'
conn = connect(s3_staging_dir=athena_result_bucket,
region_name=region).cursor()
conn.execute(query)
if verbose:
print(conn.fetchall())
print(conn.total_execution_time_in_millis)
print(conn.data_scanned_in_bytes)
# Delete S3 files
s3 = boto3.resource('s3')
objects_to_delete = s3.meta.client.list_objects(Bucket=bucket, Prefix=prefix)
delete_keys = {'Objects' : []}
delete_keys['Objects'] = [{'Key' : k} for k in [obj['Key'] for obj in objects_to_delete.get('Contents', [])]]
s3.meta.client.delete_objects(Bucket=bucket, Delete=delete_keys)