in src/aws-lambda/personalize-pre-create-campaigns/personalize-pre-create-campaigns.py [0:0]
def delete_campaign_polling(dataset_group_arn, solution_arn, **kwargs):
"""
For a particular solution Arn, remove the solution and all campaigns attached to it.
This function is meant to be called repeatedly (polling) till it returns True.
Args:
dataset_group_arn: Where to delete the campaign+solution
solution_arn: What solution to remove
kwargs: Other arguments which are ignored. This is to allow training configs to be passed around.
Returns:
Tuple - 1st item:
True if desired campaign has been deleted, or failed.
False otherwise.
2nd item:
Campaign Arn if a campaign has been scheduled for deletion
None otherwise
"""
finished = True
list_solutions_response = personalize.list_solutions(datasetGroupArn=dataset_group_arn)
list_campaigns_response = personalize.list_campaigns(solutionArn=solution_arn)
for campaign in list_campaigns_response['campaigns']:
finished = False
try:
logger.info(f"We are signalling that we do not want campaign with Arn {campaign['campaignArn']}")
personalize.delete_campaign(campaignArn=campaign['campaignArn'])
# Delete the SSM parameter if we have deleted a campaign
for ssm_param in campaign_type_to_ssm_param.values():
try:
test_campaign_arn = ssm.get_parameter(Name=ssm_param)['Parameter']['Value']
if campaign['campaignArn'].strip() == test_campaign_arn.strip():
logger.info(f"As campaign with Arn {campaign['campaignArn']} was configured in SSM parameter"
f" {ssm_param} but is to be deleted, we are removing it from SSM.")
response = ssm.put_parameter(
Name=ssm_param,
Description='Retail Demo Store Campaign Arn Parameter',
Value='NONE',
Type='String',
Overwrite=True
)
except ssm.exceptions.ParameterNotFound:
logger.info(f"No campaign recorded at {ssm_param}")
except personalize.exceptions.ResourceInUseException as ex:
logger.info(f"Campaign with Arn {campaign['campaignArn']} is still alive - waiting for it to change status "
f"so it can disappear")
if not finished:
return False
for solution in list_solutions_response['solutions']:
if solution['solutionArn'] == solution_arn:
finished = False
try:
logger.info(f"We are signalling that we do not want solution with Arn {solution_arn}")
personalize.delete_solution(solutionArn=solution_arn)
except personalize.exceptions.ResourceInUseException as ex:
logger.info(f"Campaign with Arn {solution['solutionArn']} is still alive "
f"- waiting for it to change status so it can disappear")
return finished