def pinpoint_fire_location_approached_event()

in src/aws-lambda/location-geofence-event/location-geofence-event.py [0:0]


def pinpoint_fire_location_approached_event(shopper_user_id, event_timestamp_iso=None, restrict_to_endpoint_types=None,
                                            restrict_number=None):
    """
    We fire an event in Pinpoint with name parametrised by GEOFENCE_PINPOINT_EVENTTYPE for all endpoints
    associated with this user. This enables campaigns based on location events.
    Args:
        shopper_user_id (str): Shopper User ID which Pinpoint uses internally.
        event_timestamp_iso (Optional[str]): If null current timestamp is used otherwise stamps the event with this.
        restrict_to_endpoint_types (Optional[list]): If not None, the channel types to send events for.
        restrict_number (int): Do not fire for more than more than this number of endpoints
                               for each channel type/address.

    Returns:
        None.
    """
    pinpoint_app_id = os.environ['PinpointAppId']
    try:
        endpoints = pinpoint.get_user_endpoints(UserId=shopper_user_id, ApplicationId=pinpoint_app_id)
        endpoints = endpoints['EndpointsResponse']['Item']
        logger.info(f'Endpoints for {shopper_user_id} are {endpoints}')
    except pinpoint.exceptions.NotFoundException:
        logger.warning(f"No endoints found for user {shopper_user_id} - no location event fire")
        return

    if restrict_number is not None:
        # Sometimes your Analytics may put unnecessary endpoints in to Pinpoint - for example, one for each session
        # but with the same address - we ensure that each address only has one endpoint event fired.
        removed = []
        kept = defaultdict(list)
        for endpoint in endpoints:
            if 'ChannelType' in endpoint:
                channel_type = endpoint['ChannelType']
            else:
                channel_type = 'unk'
            if 'Address' in endpoint:
                addr = endpoint['Address']
            else:
                addr = 'unk'
            if endpoint['EndpointStatus'].upper() == 'ACTIVE' and len(kept[(channel_type, addr)]) < restrict_number:
                kept[(channel_type, addr)].append(endpoint)
            else:
                removed.append(endpoint)
                logger.info(f"Dropping endpoint with Id {endpoint['Id']}")
        if len(removed) > 0:
            logger.info(f"Dropped {len(removed)} endpoints.")
        endpoints = []
        for endpointlist in kept.values():
            endpoints += endpointlist

    endpoint_ids = [endpoint['Id'] for endpoint in endpoints
                    if restrict_to_endpoint_types is None or
                    ('ChannelType' in endpoint and endpoint['ChannelType'] in restrict_to_endpoint_types)]

    if event_timestamp_iso is None:
        timestamp = datetime.now().isoformat()
    else:
        timestamp = event_timestamp_iso

    sess_id = str(uuid.uuid4())

    events = {endpoint_id: {'Endpoint': {},  # We need to provide this but empty because not here to update endpoint
                            'Events': {endpoint_id:  # API docs state this is the endpoint ID too
                                       {'EventType': GEOFENCE_PINPOINT_EVENTTYPE,
                                        'Session': {'Id': sess_id, 'StartTimestamp': timestamp},  # required
                                        'Timestamp': timestamp}}}  # required
              for endpoint_id in endpoint_ids}

    if len(events) > 0:
        logger.info(f'Firing Pinpoint events: {events}')
        pinpoint.put_events(
            ApplicationId=pinpoint_app_id,
            EventsRequest={
                'BatchItem': events
            }
        )
    else:
        logger.warning(f'Did not fire any location events for user {shopper_user_id} in app {pinpoint_app_id}')