in src/aws-lambda/personalize-pre-create-campaigns/personalize-pre-create-campaigns.py [0:0]
def update():
"""
According to the contents of the SSM variable retaildemostore-training-config build and delete
dataset groups, datasets, solutions and campaigns to reach the desired state.
The function runs off the training config which has the following JSON structure:
- Dictionary with one key "steps" which represent sequential configurations to be achieved.
- Its value is a list of dictionaries each representing a Personalize config to aspire to.
Once the step is finished the system mover on to the next step.
- Each step Personalize config dictionary has a set of keys as the name of dataset group
to create and the value the config for that dataset group.
- Each dataset group config consists of a single key: "campaigns" with its value a dictionary
with key campaign type (there are 4 campaign types: user-item, item-item, reranking)
and value campaign config for that campaign type.
- The campaign config consists of a dictionary with 3 keys:
- "desired_campaign_suffixes" - a list of numerical version numbers of solutions and campaigns to create.
For example, if the campaign_type is "retaildemostore-related-products" and the desired version nums is
[3], it will attempt to create a related products campaign with name retaildemostore-related-products-3
- "desired_active_version_suffixes" - an int showing which of these version numbers should be activated in the UI
This is achieved by putting the SSM parameter for this campaign into the right parameter so that it is
picked up by the recommendations endpoint.
- "minimum_available_campaigns" - if 0 then campaigns can get deleted even if it means there will be
no active campaigns. If 1 then this campaign is preserved till there is another campaign of this type.
See the poll_create function below for how the default training configuration is constrcuted.
As another example, the following config requests a full teardown
followed by creation of a dataset group with two campaigns:
{
"steps": [
{
"dataset_groups": null
},
{
"dataset_groups": {
"retaildemostore-MYDATASETGROUP": {
"campaigns": {
"retaildemostore-related-products": {
"desired_campaign_suffixes": [0],
"desired_active_version_suffixes": 0
},
"retaildemostore-product-personalization": {
"desired_campaign_suffixes": [0],
"desired_active_version_suffixes": 0
}
}
}
}
}
]
}
"""
# Already configured - grab that config - see it documented in the poll_create function below.
train_configs = ssm.get_parameter(Name=training_config_param_name)
train_configs = json.loads(train_configs['Parameter']['Value'])
trainstep_config = train_configs['steps'][0]
logger.info(f"Got train config: {json.dumps(trainstep_config, indent=2)}")
try:
train_state = ssm.get_parameter(Name=training_state_param_name)
train_state = json.loads(train_state['Parameter']['Value'])
except (ssm.exceptions.ParameterNotFound, json.JSONDecodeError) as e:
train_state = {'dataset_groups': [], 'schema': []}
logger.info(f"Current train state: {json.dumps(train_state, indent=2)}")
# Find all dataset groups in region
response = personalize.list_dataset_groups()
datasetGroups = response['datasetGroups']
dataset_group_name_to_arn = {datasetGroup['name']: datasetGroup['datasetGroupArn'] for datasetGroup in
datasetGroups}
# Find dataset group names I control
all_dataset_group_names = train_state['dataset_groups']
# group them into ones we want and ones we do not want
desired_dataset_group_names = [] if trainstep_config['dataset_groups'] is None else list(trainstep_config['dataset_groups'].keys())
undesired_dataset_group_names = [name for name in all_dataset_group_names if name not in desired_dataset_group_names]
all_deleted = True
all_created = True
schemas = {}
if len(desired_dataset_group_names) > 0:
# We want to create some dataset groups so we'll be needing the schema and the role
role_arn = create_personalize_role(role_name)
if not role_arn:
logger.info('Waiting for IAM role to be consistent')
return False
for dataset_group_name in desired_dataset_group_names:
all_created_dataset_group = True
dataset_group_arn = dataset_group_name_to_arn.get(dataset_group_name, None)
# Create dataset group if it doesn't exist and save the name in an SSM param
if dataset_group_arn is None:
# take ownership of this dataset group
train_state['dataset_groups'] = list(set(train_state['dataset_groups']) | {dataset_group_name})
response = ssm.put_parameter(
Name=training_state_param_name,
Description='Retail Demo Store Train State (controlled dataset groups)',
Value=json.dumps(train_state),
Type='String',
Overwrite=True
)
logger.info(f'New train state: {json.dumps(train_state)}')
logger.info(f'Generating a dataset group with unique name {dataset_group_name}')
create_dataset_group_response = personalize.create_dataset_group(name=dataset_group_name)
dataset_group_arn = create_dataset_group_response['datasetGroupArn']
# take ownership of the dataset group's event schema
train_state['schema'] = list(set(train_state['schema']) | {dataset_group_name+'-event-schema'})
response = ssm.put_parameter(
Name=training_state_param_name,
Description='Retail Demo Store Train State (controlled dataset groups)',
Value=json.dumps(train_state),
Type='String',
Overwrite=True
)
describe_dataset_group_response = personalize.describe_dataset_group(
datasetGroupArn=dataset_group_arn
)
status = describe_dataset_group_response["datasetGroup"]["status"]
logger.info("DatasetGroup: {}".format(status))
if status == "CREATE FAILED":
logger.error(f'DatasetGroup {dataset_group_name} '
f'create failed: {json.dumps(describe_dataset_group_response)}')
return False # Everything will hang on this step
# Go away for another poll till dataset group active.
if status != "ACTIVE":
logger.info(f'DatasetGroup {dataset_group_name} not active yet')
all_created = False
continue
datasets_config = trainstep_config['dataset_groups'][dataset_group_name]['datasets']
dataset_arns = {}
import_job_arns = {}
for dataset_type, dataset_detail in datasets_config.items():
dataset_filename = dataset_detail['filename']
# Conditionally create schemas
schemas[dataset_type] = create_schema(dataset_detail['schema']['avro'],
dataset_detail['schema']['name'])
# take ownership of this schema (interactions, items or users)
train_state['schema'] = list(set(train_state['schema']) | {dataset_detail['schema']['name']})
response = ssm.put_parameter(
Name=training_state_param_name,
Description='Retail Demo Store Train State (controlled dataset groups)',
Value=json.dumps(train_state),
Type='String',
Overwrite=True
)
dataset_name = dataset_group_name + '-' + dataset_type
dataset_arns[dataset_type] = create_dataset(dataset_group_arn, dataset_name,
dataset_type, schemas[dataset_type])
dataset_import_job_name = dataset_name+'-import'
s3_filename = "s3://{}/{}".format(bucket, dataset_filename)
import_job_arns[dataset_type] = create_import_job(dataset_import_job_name,
dataset_arns[dataset_type], account_id, region,
s3_filename, role_arn)
# Make sure all import jobs are done/active before continuing
for arn in import_job_arns.values():
if not is_import_job_active(arn):
logger.info(f"Import job {arn} is NOT active yet")
all_created = False
all_created_dataset_group = False
continue
if not all_created_dataset_group:
continue
# Create related product, product recommendation, and rerank solutions if they doesn't exist
# Start by calculating what recipes, with what names, event types, and whether we want activated first.
campaigns_config = trainstep_config['dataset_groups'][dataset_group_name]['campaigns']
augmented_train_config = []
for campaign_type, campaign_train_config in campaigns_config.items():
for campaign_no in campaign_train_config['desired_campaign_suffixes']:
config_for_campaign = dict(
recipe_arn=campaign_type_to_recipe_arn[campaign_type],
campaign_solution_name=campaign_type + '-' + str(campaign_no),
event_type=campaign_type_to_event_type[campaign_type],
activate_please=campaign_no == campaign_train_config['desired_active_version_suffixes'],
active_arn_param_name=campaign_type_to_ssm_param[campaign_type],
campaign_type=campaign_type)
augmented_train_config += [config_for_campaign]
# Train up any campaigns that may be missing and set their SSMs
logger.info(f"Set up to train with augmented training config: {json.dumps(augmented_train_config, indent=4)}")
for train_job in augmented_train_config:
logger.info(f'Polling training job {train_job}')
campaign_arn = create_campaign_polling(dataset_group_arn=dataset_group_arn,
**train_job)
if campaign_arn is not None and train_job['activate_please']:
logger.info(f"Setting campaignArn {campaign_arn} as system parameter"
f" for {train_job['active_arn_param_name']} which has finished")
# Finally, set the campaign arn as the system parameter expected by services
response = ssm.put_parameter(
Name=train_job['active_arn_param_name'],
Description='Retail Demo Store Campaign Arn Parameter',
Value='{}'.format(campaign_arn),
Type='String',
Overwrite=True
)
all_created_dataset_group = all_created_dataset_group and campaign_arn is not None
all_created = all_created and all_created_dataset_group
# Now we will go through the existing solutions and remove any we don't want
list_solutions_response = personalize.list_solutions(datasetGroupArn=dataset_group_arn)
all_desired_solution_names = [d['campaign_solution_name'] for d in augmented_train_config]
# We go through the existing solutions and delete ones that are not in the list of desired solutions
for solution in sorted(list_solutions_response['solutions'], key=lambda x:x['name']):
if solution['name'] in all_desired_solution_names:
pass # We can keep this one because we have been configured to build it
else:
logger.info(f"Solution {solution['name']} with Arn {solution['solutionArn']} is unwanted. "
"We will try to remove it.")
deleted_one = delete_campaign_polling(
dataset_group_arn=dataset_group_arn,
solution_arn=solution['solutionArn'])
all_deleted = all_deleted and deleted_one
# Create recent product purchase and category filter, if necessary
# (or whatever filters have been configured)
filters_config = trainstep_config['dataset_groups'][dataset_group_name]['filters']
if filters_config is not None:
for filter_config in filters_config:
create_filter(dataset_group_arn=dataset_group_arn,
arn_param=filter_config['arn_param'],
filter_name=filter_config['filter_name'],
filter_expression=filter_config['filter_expression'])
tracker_config = trainstep_config['dataset_groups'][dataset_group_name]['tracker']
if tracker_config:
list_event_trackers_response = personalize.list_event_trackers(datasetGroupArn=dataset_group_arn)
if len(list_event_trackers_response['eventTrackers']) == 0 and all_created:
# Either hasn't been created yet or isn't active yet.
if len(list_event_trackers_response['eventTrackers']) == 0:
logger.info('Event Tracker does not exist; creating')
event_tracker = personalize.create_event_tracker(
datasetGroupArn=dataset_group_arn,
name='retaildemostore-event-tracker'
)
if event_tracker.get('trackingId'):
event_tracking_id = event_tracker['trackingId']
logger.info('Setting event tracking ID {} as SSM parameter'.format(event_tracking_id))
ssm.put_parameter(
Name=event_tracking_id_param,
Description='Retail Demo Store Personalize Event Tracker ID Parameter',
Value='{}'.format(event_tracking_id),
Type='String',
Overwrite=True
)
# Trigger rebuild of Web UI service so event tracker gets picked up.
rebuild_webui_service(region, account_id)
return False # Give event tracker a moment to get ready
else:
event_tracker = list_event_trackers_response['eventTrackers'][0]
logger.info("Event Tracker: {}".format(event_tracker['status']))
if event_tracker['status'] == 'CREATE FAILED':
logger.error('Event tracker create failed: {}'.format(json.dumps(event_tracker)))
return False
# Now go through dataset groups getting rid of any we do not need.
for dataset_group_name in undesired_dataset_group_names:
dataset_group_arn = dataset_group_name_to_arn.get(dataset_group_name, None)
if dataset_group_arn is not None:
all_deleted = False
# Note that it may not pull down if there are campaigns and solutions attached to it.
# So we can try to remove them
list_solutions_response = personalize.list_solutions(datasetGroupArn=dataset_group_arn)
for solution in list_solutions_response['solutions']:
_ = delete_campaign_polling(
dataset_group_arn=dataset_group_arn,
solution_arn=solution['solutionArn'])
if len(list_solutions_response['solutions'])>0:
logger.info(f"We do not need this dataset group {dataset_group_arn} but it still has solutions.")
else:
logger.info(f"We do not need this dataset group {dataset_group_arn}. Let us take it down.")
# Other than the dataset group, no deps on filters so delete them first.
delete_filters(dataset_group_arn)
if delete_event_trackers(dataset_group_arn):
logger.info('EventTrackers fully deleted')
if delete_datasets(dataset_group_arn):
logger.info('Datasets fully deleted')
if delete_dataset_group(dataset_group_arn):
logger.info('DatasetGroup fully deleted')
if len(desired_dataset_group_names) == 0:
all_deleted = all_deleted and delete_personalize_schemas(train_state['schema'])
all_deleted = all_deleted and delete_personalize_role()
if all_created and all_deleted:
# No need for this lambda function to be called anymore so disable CW event rule that has been calling us.
# If somethihng wants this functionality, they just enable the rule.
# Alternatively, if we have been configured with multi-step config, move on to the next step.
msg = ('Related product, Product recommendation, Personalized reranking, '
'fully provisioned '
'and unwanted campaigns removed.')
logger.info(msg)
if len(train_configs['steps'])>1:
train_configs['steps'] = train_configs['steps'][1:]
ssm.put_parameter(
Name=training_config_param_name,
Description='Retail Demo Store Training Config',
Value=json.dumps(train_configs, indent=3),
Type='String',
Overwrite=True
)
logger.info(f" - Popping the training config that just succeded. "
f"New training config: {json.dumps(train_configs, indent=2)}")
logger.info(msg)
return False
else:
logger.info("Finished polling.")
return True
else:
if all_created:
msg = 'Still trying to clean up some things.'
elif all_deleted:
msg = "Still trying to provision something."
else:
msg = "Still trying to provision and delete things."
logger.info(msg)
return False