in src/aws-lambda/location-geofence-event/location-geofence-event.py [0:0]
def pinpoint_fire_location_approached_event(shopper_user_id, event_timestamp_iso=None, restrict_to_endpoint_types=None,
restrict_number=None):
"""
We fire an event in Pinpoint with name parametrised by GEOFENCE_PINPOINT_EVENTTYPE for all endpoints
associated with this user. This enables campaigns based on location events.
Args:
shopper_user_id (str): Shopper User ID which Pinpoint uses internally.
event_timestamp_iso (Optional[str]): If null current timestamp is used otherwise stamps the event with this.
restrict_to_endpoint_types (Optional[list]): If not None, the channel types to send events for.
restrict_number (int): Do not fire for more than more than this number of endpoints
for each channel type/address.
Returns:
None.
"""
pinpoint_app_id = os.environ['PinpointAppId']
try:
endpoints = pinpoint.get_user_endpoints(UserId=shopper_user_id, ApplicationId=pinpoint_app_id)
endpoints = endpoints['EndpointsResponse']['Item']
logger.info(f'Endpoints for {shopper_user_id} are {endpoints}')
except pinpoint.exceptions.NotFoundException:
logger.warning(f"No endoints found for user {shopper_user_id} - no location event fire")
return
if restrict_number is not None:
# Sometimes your Analytics may put unnecessary endpoints in to Pinpoint - for example, one for each session
# but with the same address - we ensure that each address only has one endpoint event fired.
removed = []
kept = defaultdict(list)
for endpoint in endpoints:
if 'ChannelType' in endpoint:
channel_type = endpoint['ChannelType']
else:
channel_type = 'unk'
if 'Address' in endpoint:
addr = endpoint['Address']
else:
addr = 'unk'
if endpoint['EndpointStatus'].upper() == 'ACTIVE' and len(kept[(channel_type, addr)]) < restrict_number:
kept[(channel_type, addr)].append(endpoint)
else:
removed.append(endpoint)
logger.info(f"Dropping endpoint with Id {endpoint['Id']}")
if len(removed) > 0:
logger.info(f"Dropped {len(removed)} endpoints.")
endpoints = []
for endpointlist in kept.values():
endpoints += endpointlist
endpoint_ids = [endpoint['Id'] for endpoint in endpoints
if restrict_to_endpoint_types is None or
('ChannelType' in endpoint and endpoint['ChannelType'] in restrict_to_endpoint_types)]
if event_timestamp_iso is None:
timestamp = datetime.now().isoformat()
else:
timestamp = event_timestamp_iso
sess_id = str(uuid.uuid4())
events = {endpoint_id: {'Endpoint': {}, # We need to provide this but empty because not here to update endpoint
'Events': {endpoint_id: # API docs state this is the endpoint ID too
{'EventType': GEOFENCE_PINPOINT_EVENTTYPE,
'Session': {'Id': sess_id, 'StartTimestamp': timestamp}, # required
'Timestamp': timestamp}}} # required
for endpoint_id in endpoint_ids}
if len(events) > 0:
logger.info(f'Firing Pinpoint events: {events}')
pinpoint.put_events(
ApplicationId=pinpoint_app_id,
EventsRequest={
'BatchItem': events
}
)
else:
logger.warning(f'Did not fire any location events for user {shopper_user_id} in app {pinpoint_app_id}')