protected List callEventHandlers()

in stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventServiceImpl.java [408:527]


    protected List<IndexEventResult> callEventHandlers(final List<LegacyQueueMessage> messages) {

        if (logger.isDebugEnabled()) {
            logger.debug("callEventHandlers with {} message(s)", messages.size());
        }

        Stream<IndexEventResult> indexEventResults = messages.stream().map(message ->

        {
            if(logger.isDebugEnabled()){
                logger.debug("Queue message with ID {} has been received {} time(s)",
                    message.getMessageId(),
                    message.getReceiveCount() );
            }

            AsyncEvent event = null;
            try {
                event = (AsyncEvent) message.getBody();

            } catch (ClassCastException cce) {
                logger.error("Failed to deserialize message body", cce);
                return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
            }

            if (event == null) {
                logger.error("AsyncEvent type or event is null!");
                return new IndexEventResult(Optional.absent(), Optional.absent(), System.currentTimeMillis());
            }

            final AsyncEvent thisEvent = event;

            if (logger.isDebugEnabled()) {
                logger.debug("Processing event with type {}", event.getClass().getSimpleName());
            }

            try {

                IndexOperationMessage single = new IndexOperationMessage();

                // normal indexing event for an entity
                if ( event instanceof  EntityIndexEvent ){

                     single = handleEntityIndexUpdate( message );

                }
                // normal indexing event for an edge
                else if ( event instanceof EdgeIndexEvent ){

                    single = handleEdgeIndex( message );

                }
                // deletes are 2-part, actual IO to delete data, then queue up a de-index
                else if ( event instanceof EdgeDeleteEvent ) {

                    single = handleEdgeDelete( message );
                }
                // deletes are 2-part, actual IO to delete data, then queue up a de-index
                else if ( event instanceof EntityDeleteEvent ) {

                    single = handleEntityDelete( message );
                }
                // initialization has special logic, therefore a special event type and no index operation message
                else if ( event instanceof InitializeApplicationIndexEvent ) {

                    handleInitializeApplicationIndex(event, message);
                }
                // this is the main event that pulls the index doc from map persistence and hands to the index producer
                else if (event instanceof ElasticsearchIndexEvent) {

                    handleIndexOperation((ElasticsearchIndexEvent) event);

                } else if (event instanceof DeIndexOldVersionsEvent) {

                    single = handleDeIndexOldVersionEvent((DeIndexOldVersionsEvent) event);

                } else {

                    throw new Exception("Unknown EventType for message: "+ message.getStringBody().trim());
                }


                if( !(event instanceof ElasticsearchIndexEvent)
                    && !(event instanceof InitializeApplicationIndexEvent)
                      && single.isEmpty() ){
                        logger.warn("No index operation messages came back from event processing for eventType: {}, msgId: {}, msgBody: {}",
                            event.getClass().getSimpleName(), message.getMessageId(), message.getStringBody());
                }


                // if no exception happens and the QueueMessage is returned in these results, it will get ack'd
                return new IndexEventResult(Optional.of(single), Optional.of(message), thisEvent.getCreationTime());

            } catch (IndexDocNotFoundException e){

                // this exception is throw when we wait before trying quorum read on map persistence.
                // return empty event result so the event's message doesn't get ack'd
                if(logger.isDebugEnabled()){
                    logger.debug(e.getMessage());
                }
                return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());

            } catch (Exception e) {

                // NPEs don't have a detail message, so add something for our log statement to identify better
                final String errorMessage;
                if( e instanceof NullPointerException ) {
                    errorMessage = "NullPointerException";
                }else{
                    errorMessage = e.getMessage();
                }

                // if the event fails to process, log and return empty message result so it doesn't get ack'd
                logger.error("{}. Failed to process message: {}", errorMessage, message.getStringBody().trim() );
                return new IndexEventResult(Optional.absent(), Optional.absent(), thisEvent.getCreationTime());
            }
        });


        return indexEventResults.collect(Collectors.toList());
    }