in stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java [408:527]
protected List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {
if (logger.isDebugEnabled()) {
logger.debug("callEventHandlers with {} message(s)", messages.size());
}
Stream<IndexEventResult> indexEventResults = messages.stream().map(message ->
{
if(logger.isDebugEnabled()){
logger.debug("Queue message with ID {} has been received {} time(s)",
message.getMessageId(),
message.getReceiveCount() );
}
AsyncEvent event = null;
try {
event = (AsyncEvent) message.getBody();
} catch (ClassCastException cce) {
logger.error("Failed to deserialize message body", cce);
return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
}
if (event == null) {
logger.error("AsyncEvent type or event is null!");
return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
}
final AsyncEvent thisEvent = event;
if (logger.isDebugEnabled()) {
logger.debug("Processing event with type {}", event.getClass().getSimpleName());
}
try {
IndexOperationMessage single = new IndexOperationMessage();
// normal indexing event for an entity
if ( event instanceof EntityIndexEvent ){
single = handleEntityIndexUpdate( message );
}
// normal indexing event for an edge
else if ( event instanceof EdgeIndexEvent ){
single = handleEdgeIndex( message );
}
// deletes are 2-part, actual IO to delete data, then queue up a de-index
else if ( event instanceof EdgeDeleteEvent ) {
single = handleEdgeDelete( message );
}
// deletes are 2-part, actual IO to delete data, then queue up a de-index
else if ( event instanceof EntityDeleteEvent ) {
single = handleEntityDelete( message );
}
// initialization has special logic, therefore a special event type and no index operation message
else if ( event instanceof InitializeApplicationIndexEvent ) {
handleInitializeApplicationIndex(event, message);
}
// this is the main event that pulls the index doc from map persistence and hands to the index producer
else if (event instanceof ElasticsearchIndexEvent) {
handleIndexOperation((ElasticsearchIndexEvent) event);
} else if (event instanceof DeIndexOldVersionsEvent) {
single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);
} else {
throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
}
if( !(event instanceof ElasticsearchIndexEvent)
&& !(event instanceof InitializeApplicationIndexEvent)
&& single.isEmpty() ){
logger.warn("No index operation messages came back from event processing for eventType: {}, msgId: {}, msgBody: {}",
event.getClass().getSimpleName(), message.getMessageId(), message.getStringBody());
}
// if no exception happens and the QueueMessage is returned in these results, it will get ack'd
return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime());
} catch (IndexDocNotFoundException e){
// this exception is throw when we wait before trying quorum read on map persistence.
// return empty event result so the event's message doesn't get ack'd
if(logger.isDebugEnabled()){
logger.debug(e.getMessage());
}
return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
} catch (Exception e) {
// NPEs don't have a detail message, so add something for our log statement to identify better
final String errorMessage;
if( e instanceof NullPointerException ) {
errorMessage = "NullPointerException";
}else{
errorMessage = e.getMessage();
}
// if the event fails to process, log and return empty message result so it doesn't get ack'd
logger.error("{}. Failed to process message: {}", errorMessage, message.getStringBody().trim() );
return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
}
});
return indexEventResults.collect(Collectors.toList());
}