in 07-module-feature-monitoring/feature_monitoring_utils.py [0:0]
def create_fg_snapshot_ctas(fg_name, verbose):
fg = FeatureGroup(name=fg_name, sagemaker_session=feature_store_session)
s3_uri = fg.describe()['OfflineStoreConfig']['S3StorageConfig']['S3Uri']
database_name = fg.describe()['OfflineStoreConfig']['DataCatalogConfig']['Database']
table_name_source = fg.describe()['OfflineStoreConfig']['DataCatalogConfig']['TableName']
fg_s3_url = f'{s3_uri}/{account_id}/sagemaker/{region}/offline-store/{table_name_source}'
fg_file_name = f'sagemaker-feature-store/{account_id}/sagemaker/{region}/offline-store/{table_name_source}'
fg_unique_id = fg.describe()['RecordIdentifierFeatureName']
fg_event_time = fg.describe()['EventTimeFeatureName']
print(f"Feature Group S3 URL: {fg_s3_url}")
print(f"Feature Group Table Name: {table_name_source}")
table_name_target = f'{table_name_source}{ctas_table_suffix}'
target_table = f'"{database_name}"."{table_name_target}"'
source_table = f'"{database_name}"."{table_name_source}"'
query = f' CREATE TABLE {target_table} AS ' \
f' SELECT * ' \
f' FROM (SELECT *, row_number() ' \
f' OVER (PARTITION BY {fg_unique_id} ' \
f' ORDER BY {fg_event_time} desc, api_invocation_time DESC, write_time DESC) AS row_num ' \
f' FROM {source_table}) ' \
f' WHERE row_num = 1 and NOT is_deleted '
athena_result_bucket = f'{s3_uri}/{account_id}/sagemaker/{region}'
conn = connect(s3_staging_dir=athena_result_bucket,
region_name=region).cursor()
conn.execute(query)
if verbose:
print(conn.fetchall())
print("Total execution time in millis: ", conn.total_execution_time_in_millis)
print("Total data scanned in bytes: ", conn.data_scanned_in_bytes)
print (f"CTAS table created successfully: {table_name_target}")
return table_name_target