in rest/src/main/java/org/apache/unomi/rest/service/impl/RestServiceUtilsImpl.java [225:287]
public EventsRequestContext performEventsRequest(List<Event> events, EventsRequestContext eventsRequestContext) {
List<String> filteredEventTypes = privacyService.getFilteredEventTypes(eventsRequestContext.getProfile());
String thirdPartyId = eventService.authenticateThirdPartyServer(eventsRequestContext.getRequest().getHeader("X-Unomi-Peer"),
eventsRequestContext.getRequest().getRemoteAddr());
// execute provided events if any
if (events != null && !(eventsRequestContext.getProfile() instanceof Persona)) {
// set Total items on context
eventsRequestContext.setTotalItems(events.size());
for (Event event : events) {
eventsRequestContext.setProcessedItems(eventsRequestContext.getProcessedItems() + 1);
if (event.getEventType() != null) {
Event eventToSend = new Event(event.getEventType(), eventsRequestContext.getSession(), eventsRequestContext.getProfile(), event.getScope(), event.getSource(),
event.getTarget(), event.getProperties(), eventsRequestContext.getTimestamp(), event.isPersistent());
eventToSend.setFlattenedProperties(event.getFlattenedProperties());
if (!eventService.isEventAllowed(event, thirdPartyId)) {
LOGGER.warn("Event is not allowed : {}", event.getEventType());
continue;
}
if (thirdPartyId != null && event.getItemId() != null) {
eventToSend = new Event(event.getItemId(), event.getEventType(), eventsRequestContext.getSession(), eventsRequestContext.getProfile(), event.getScope(),
event.getSource(), event.getTarget(), event.getProperties(), eventsRequestContext.getTimestamp(), event.isPersistent());
eventToSend.setFlattenedProperties(event.getFlattenedProperties());
}
if (filteredEventTypes != null && filteredEventTypes.contains(event.getEventType())) {
LOGGER.debug("Profile is filtering event type {}", event.getEventType());
continue;
}
if (eventsRequestContext.getProfile().isAnonymousProfile()) {
// Do not keep track of profile in event
eventToSend.setProfileId(null);
}
eventToSend.getAttributes().put(Event.HTTP_REQUEST_ATTRIBUTE, eventsRequestContext.getRequest());
eventToSend.getAttributes().put(Event.HTTP_RESPONSE_ATTRIBUTE, eventsRequestContext.getResponse());
LOGGER.debug("Received event {} for profile={} session={} target={} timestamp={}", event.getEventType(),
eventsRequestContext.getProfile().getItemId(),
eventsRequestContext.getSession() != null ? eventsRequestContext.getSession().getItemId() : null,
event.getTarget(), eventsRequestContext.getTimestamp());
eventsRequestContext.addChanges(eventService.send(eventToSend));
// If the event execution changes the profile we need to update it so the next event use the right profile
if ((eventsRequestContext.getChanges() & EventService.PROFILE_UPDATED) == EventService.PROFILE_UPDATED) {
eventsRequestContext.setProfile(eventToSend.getProfile());
}
if (eventsRequestContext.isNewSession()) {
eventsRequestContext.getSession().getOriginEventIds().add(eventToSend.getItemId());
eventsRequestContext.getSession().getOriginEventTypes().add(eventToSend.getEventType());
}
if ((eventsRequestContext.getChanges() & EventService.ERROR) == EventService.ERROR) {
//Don't count the event that failed
eventsRequestContext.setProcessedItems(eventsRequestContext.getProcessedItems() - 1);
LOGGER.error("Error processing events. Total number of processed events: {}/{}", eventsRequestContext.getProcessedItems(), eventsRequestContext.getTotalItems());
break;
}
}
}
}
return eventsRequestContext;
}