def update()

in src/aws-lambda/personalize-pre-create-campaigns/personalize-pre-create-campaigns.py [0:0]


def update():
    """
    According to the contents of the SSM variable retaildemostore-training-config build and delete
    dataset groups, datasets, solutions and campaigns to reach the desired state.

    The function runs off the training config which has the following JSON structure:
        - Dictionary with one key "steps" which represent sequential configurations to be achieved.
        - Its value is a list of dictionaries each representing a Personalize config to aspire to.
          Once the step is finished the system mover on to the next step.
        - Each step Personalize config dictionary has a set of keys as the name of dataset group
          to create and the value the config for that dataset group.
        - Each dataset group config consists of a single key: "campaigns" with its value a dictionary
          with key campaign type (there are 4 campaign types: user-item, item-item, reranking)
          and value campaign config for that campaign type.
        - The campaign config consists of a dictionary with 3 keys:
            - "desired_campaign_suffixes" - a list of numerical version numbers of solutions and campaigns to create.
              For example, if the campaign_type is "retaildemostore-related-products" and the desired version nums is
              [3], it will attempt to create a related products campaign with name retaildemostore-related-products-3
            - "desired_active_version_suffixes" - an int showing which of these version numbers should be activated in the UI
              This is achieved by putting the SSM parameter for this campaign into the right parameter so that it is
              picked up by the recommendations endpoint.
            - "minimum_available_campaigns" - if 0 then campaigns can get deleted even if it means there will be
              no active campaigns. If 1 then this campaign is preserved till there is another campaign of this type.

        See the poll_create function below for how the default training configuration is constrcuted.

        As another example, the following config requests a full teardown
        followed by creation of a dataset group with two campaigns:

            {
                "steps": [
                    {
                        "dataset_groups": null
                    },
                    {
                        "dataset_groups": {
                            "retaildemostore-MYDATASETGROUP": {
                                "campaigns": {
                                    "retaildemostore-related-products": {
                                        "desired_campaign_suffixes": [0],
                                        "desired_active_version_suffixes": 0
                                    },
                                    "retaildemostore-product-personalization": {
                                        "desired_campaign_suffixes": [0],
                                        "desired_active_version_suffixes": 0
                                    }
                                }
                            }
                        }
                    }
                ]
            }
    """

    # Already configured - grab that config - see it documented in the poll_create function below.
    train_configs = ssm.get_parameter(Name=training_config_param_name)
    train_configs = json.loads(train_configs['Parameter']['Value'])

    trainstep_config = train_configs['steps'][0]
    logger.info(f"Got train config: {json.dumps(trainstep_config, indent=2)}")

    try:
        train_state = ssm.get_parameter(Name=training_state_param_name)
        train_state = json.loads(train_state['Parameter']['Value'])
    except (ssm.exceptions.ParameterNotFound, json.JSONDecodeError) as e:
        train_state = {'dataset_groups': [], 'schema': []}
    logger.info(f"Current train state: {json.dumps(train_state, indent=2)}")

    # Find all dataset groups in region
    response = personalize.list_dataset_groups()
    datasetGroups = response['datasetGroups']
    dataset_group_name_to_arn = {datasetGroup['name']: datasetGroup['datasetGroupArn'] for datasetGroup in
                                 datasetGroups}

    # Find dataset group names I control
    all_dataset_group_names = train_state['dataset_groups']

    # group them into ones we want and ones we do not want
    desired_dataset_group_names = [] if trainstep_config['dataset_groups'] is None else list(trainstep_config['dataset_groups'].keys())
    undesired_dataset_group_names = [name for name in all_dataset_group_names if name not in desired_dataset_group_names]

    all_deleted = True
    all_created = True

    schemas = {}

    if len(desired_dataset_group_names) > 0:
        #  We want to create some dataset groups so we'll be needing the schema and the role
        role_arn = create_personalize_role(role_name)
        if not role_arn:
            logger.info('Waiting for IAM role to be consistent')
            return False

    for dataset_group_name in desired_dataset_group_names:

        all_created_dataset_group = True
        dataset_group_arn = dataset_group_name_to_arn.get(dataset_group_name, None)

        # Create dataset group if it doesn't exist and save the name in an SSM param
        if dataset_group_arn is None:

            # take ownership of this dataset group
            train_state['dataset_groups'] = list(set(train_state['dataset_groups']) | {dataset_group_name})
            response = ssm.put_parameter(
                Name=training_state_param_name,
                Description='Retail Demo Store Train State (controlled dataset groups)',
                Value=json.dumps(train_state),
                Type='String',
                Overwrite=True
            )
            logger.info(f'New train state: {json.dumps(train_state)}')

            logger.info(f'Generating a dataset group with unique name {dataset_group_name}')
            create_dataset_group_response = personalize.create_dataset_group(name=dataset_group_name)
            dataset_group_arn = create_dataset_group_response['datasetGroupArn']

            # take ownership of the dataset group's event schema
            train_state['schema'] = list(set(train_state['schema']) | {dataset_group_name+'-event-schema'})
            response = ssm.put_parameter(
                Name=training_state_param_name,
                Description='Retail Demo Store Train State (controlled dataset groups)',
                Value=json.dumps(train_state),
                Type='String',
                Overwrite=True
            )

        describe_dataset_group_response = personalize.describe_dataset_group(
            datasetGroupArn=dataset_group_arn
        )

        status = describe_dataset_group_response["datasetGroup"]["status"]
        logger.info("DatasetGroup: {}".format(status))

        if status == "CREATE FAILED":
            logger.error(f'DatasetGroup {dataset_group_name} '
                         f'create failed: {json.dumps(describe_dataset_group_response)}')
            return False  # Everything will hang on this step

        # Go away for another poll till dataset group active.
        if status != "ACTIVE":
            logger.info(f'DatasetGroup {dataset_group_name} not active yet')
            all_created = False
            continue

        datasets_config = trainstep_config['dataset_groups'][dataset_group_name]['datasets']

        dataset_arns = {}
        import_job_arns = {}

        for dataset_type, dataset_detail in datasets_config.items():

            dataset_filename = dataset_detail['filename']

            # Conditionally create schemas
            schemas[dataset_type] = create_schema(dataset_detail['schema']['avro'],
                                                  dataset_detail['schema']['name'])

            # take ownership of this schema (interactions, items or users)
            train_state['schema'] = list(set(train_state['schema']) | {dataset_detail['schema']['name']})
            response = ssm.put_parameter(
                Name=training_state_param_name,
                Description='Retail Demo Store Train State (controlled dataset groups)',
                Value=json.dumps(train_state),
                Type='String',
                Overwrite=True
            )

            dataset_name = dataset_group_name + '-' + dataset_type
            dataset_arns[dataset_type] = create_dataset(dataset_group_arn, dataset_name,
                                                        dataset_type, schemas[dataset_type])

            dataset_import_job_name = dataset_name+'-import'
            s3_filename = "s3://{}/{}".format(bucket, dataset_filename)
            import_job_arns[dataset_type] = create_import_job(dataset_import_job_name,
                                                             dataset_arns[dataset_type], account_id, region,
                                                             s3_filename, role_arn)

        # Make sure all import jobs are done/active before continuing
        for arn in import_job_arns.values():

            if not is_import_job_active(arn):
                logger.info(f"Import job {arn} is NOT active yet")
                all_created = False
                all_created_dataset_group = False
                continue

        if not all_created_dataset_group:
            continue

        # Create related product, product recommendation, and rerank solutions if they doesn't exist
        # Start by calculating what recipes, with what names, event types, and whether we want activated first.
        campaigns_config = trainstep_config['dataset_groups'][dataset_group_name]['campaigns']
        augmented_train_config = []
        for campaign_type, campaign_train_config in campaigns_config.items():

            for campaign_no in campaign_train_config['desired_campaign_suffixes']:

                config_for_campaign = dict(
                    recipe_arn=campaign_type_to_recipe_arn[campaign_type],
                    campaign_solution_name=campaign_type + '-' + str(campaign_no),
                    event_type=campaign_type_to_event_type[campaign_type],
                    activate_please=campaign_no == campaign_train_config['desired_active_version_suffixes'],
                    active_arn_param_name=campaign_type_to_ssm_param[campaign_type],
                    campaign_type=campaign_type)

                augmented_train_config += [config_for_campaign]

        # Train up any campaigns that may be missing and set their SSMs
        logger.info(f"Set up to train with augmented training config: {json.dumps(augmented_train_config, indent=4)}")
        for train_job in augmented_train_config:
            logger.info(f'Polling training job {train_job}')
            campaign_arn = create_campaign_polling(dataset_group_arn=dataset_group_arn,
                                                   **train_job)
            if campaign_arn is not None and train_job['activate_please']:
                logger.info(f"Setting campaignArn {campaign_arn} as system parameter"
                            f" for {train_job['active_arn_param_name']} which has finished")
                # Finally, set the campaign arn as the system parameter expected by services
                response = ssm.put_parameter(
                    Name=train_job['active_arn_param_name'],
                    Description='Retail Demo Store Campaign Arn Parameter',
                    Value='{}'.format(campaign_arn),
                    Type='String',
                    Overwrite=True
                )
            all_created_dataset_group = all_created_dataset_group and campaign_arn is not None
            all_created = all_created and all_created_dataset_group

        # Now we will go through the existing solutions and remove any we don't want
        list_solutions_response = personalize.list_solutions(datasetGroupArn=dataset_group_arn)
        all_desired_solution_names = [d['campaign_solution_name'] for d in augmented_train_config]

        # We go through the existing solutions and delete ones that are not in the list of desired solutions
        for solution in sorted(list_solutions_response['solutions'], key=lambda x:x['name']):
            if solution['name'] in all_desired_solution_names:
                pass  # We can keep this one because we have been configured to build it
            else:
                logger.info(f"Solution {solution['name']} with Arn {solution['solutionArn']} is unwanted. "
                            "We will try to remove it.")

                deleted_one = delete_campaign_polling(
                    dataset_group_arn=dataset_group_arn,
                    solution_arn=solution['solutionArn'])
                all_deleted = all_deleted and deleted_one

        # Create recent product purchase and category filter, if necessary
        # (or whatever filters have been configured)
        filters_config = trainstep_config['dataset_groups'][dataset_group_name]['filters']
        if filters_config is not None:
            for filter_config in filters_config:
                create_filter(dataset_group_arn=dataset_group_arn,
                              arn_param=filter_config['arn_param'],
                              filter_name=filter_config['filter_name'],
                              filter_expression=filter_config['filter_expression'])

        tracker_config = trainstep_config['dataset_groups'][dataset_group_name]['tracker']
        if tracker_config:
            list_event_trackers_response = personalize.list_event_trackers(datasetGroupArn=dataset_group_arn)
            if len(list_event_trackers_response['eventTrackers']) == 0 and all_created:

                # Either hasn't been created yet or isn't active yet.
                if len(list_event_trackers_response['eventTrackers']) == 0:
                    logger.info('Event Tracker does not exist; creating')
                    event_tracker = personalize.create_event_tracker(
                        datasetGroupArn=dataset_group_arn,
                        name='retaildemostore-event-tracker'
                    )

                    if event_tracker.get('trackingId'):
                        event_tracking_id = event_tracker['trackingId']
                        logger.info('Setting event tracking ID {} as SSM parameter'.format(event_tracking_id))

                        ssm.put_parameter(
                            Name=event_tracking_id_param,
                            Description='Retail Demo Store Personalize Event Tracker ID Parameter',
                            Value='{}'.format(event_tracking_id),
                            Type='String',
                            Overwrite=True
                        )
                        # Trigger rebuild of Web UI service so event tracker gets picked up.
                        rebuild_webui_service(region, account_id)

                    return False  # Give event tracker a moment to get ready
                else:
                    event_tracker = list_event_trackers_response['eventTrackers'][0]
                    logger.info("Event Tracker: {}".format(event_tracker['status']))

                    if event_tracker['status'] == 'CREATE FAILED':
                        logger.error('Event tracker create failed: {}'.format(json.dumps(event_tracker)))
                        return False

    # Now go through dataset groups getting rid of any we do not need.
    for dataset_group_name in undesired_dataset_group_names:

        dataset_group_arn = dataset_group_name_to_arn.get(dataset_group_name, None)

        if dataset_group_arn is not None:

            all_deleted = False
            # Note that it may not pull down if there are campaigns and solutions attached to it.
            # So we can try to remove them
            list_solutions_response = personalize.list_solutions(datasetGroupArn=dataset_group_arn)
            for solution in list_solutions_response['solutions']:
                _ = delete_campaign_polling(
                        dataset_group_arn=dataset_group_arn,
                        solution_arn=solution['solutionArn'])

            if len(list_solutions_response['solutions'])>0:
                logger.info(f"We do not need this dataset group {dataset_group_arn} but it still has solutions.")
            else:
                logger.info(f"We do not need this dataset group {dataset_group_arn}. Let us take it down.")
                # Other than the dataset group, no deps on filters so delete them first.
                delete_filters(dataset_group_arn)
                if delete_event_trackers(dataset_group_arn):
                    logger.info('EventTrackers fully deleted')
                    if delete_datasets(dataset_group_arn):
                        logger.info('Datasets fully deleted')
                        if delete_dataset_group(dataset_group_arn):
                            logger.info('DatasetGroup fully deleted')

    if len(desired_dataset_group_names) == 0:
        all_deleted = all_deleted and delete_personalize_schemas(train_state['schema'])
        all_deleted = all_deleted and delete_personalize_role()

    if all_created and all_deleted:
        # No need for this lambda function to be called anymore so disable CW event rule that has been calling us.
        # If somethihng wants this functionality, they just enable the rule.
        # Alternatively, if we have been configured with multi-step config, move on to the next step.
        msg = ('Related product, Product recommendation, Personalized reranking, '
               'fully provisioned '
               'and unwanted campaigns removed.')
        logger.info(msg)
        if len(train_configs['steps'])>1:
            train_configs['steps'] = train_configs['steps'][1:]
            ssm.put_parameter(
                Name=training_config_param_name,
                Description='Retail Demo Store Training Config',
                Value=json.dumps(train_configs, indent=3),
                Type='String',
                Overwrite=True
            )
            logger.info(f" - Popping the training config that just succeded. "
                        f"New training config: {json.dumps(train_configs, indent=2)}")
            logger.info(msg)
            return False
        else:
            logger.info("Finished polling.")
            return True
    else:
        if all_created:
            msg = 'Still trying to clean up some things.'
        elif all_deleted:
            msg = "Still trying to provision something."
        else:
            msg = "Still trying to provision and delete things."
        logger.info(msg)
        return False