private void execute()

in stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java [146:303]


    private void execute(int threadNumber){

        if(Thread.currentThread().isDaemon()) {
            Thread.currentThread().setDaemon(true);
        }

        Thread.currentThread().setName(getClass().getSimpleName()+"_Push-"+ RandomStringUtils.randomAlphanumeric(4)+"-"+threadNumber);

        final AtomicInteger consecutiveExceptions = new AtomicInteger();

        if (logger.isTraceEnabled()) {
            logger.trace("QueueListener: Starting execute process.");
        }

        Meter meter = metricsService.getMeter(QueueListener.class, "execute.commit");
        com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue");

        if (logger.isTraceEnabled()) {
            logger.trace("getting from queue {} ", queueName);
        }

        LegacyQueueScope queueScope = new LegacyQueueScopeImpl( queueName, LegacyQueueScope.RegionImplementation.LOCAL);
        LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope);

        // run until there are no more active jobs
        final AtomicLong runCount = new AtomicLong(0);

        while ( true ) {

                if(sleepBetweenRuns > 0) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns);
                    }
                    try { Thread.sleep(sleepBetweenRuns); } catch (InterruptedException ignored) { }
                }

                Timer.Context timerContext = timer.time();
                rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
                    .buffer(MAX_TAKE)
                    .doOnNext(messages -> {

                        try {
                            if (logger.isTraceEnabled()) {
                                logger.trace("retrieved batch of {} messages from queue {}",messages.size(),queueName);
                            }

                            if (messages.size() > 0) {
                                HashMap<UUID, List<LegacyQueueMessage>> messageMap = new HashMap<>(messages.size());

                                //group messages into hash map by app id
                                for (LegacyQueueMessage message : messages) {
                                    //TODO: stop copying around this area as it gets notification specific.
                                    ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
                                    UUID applicationId = queueMessage.getApplicationId();

                                    // Groups queue messages by application Id,
                                    // (they are all probably going to the same place)
                                    if (!messageMap.containsKey(applicationId)) {
                                        //For each app id it sends the set.
                                        List<LegacyQueueMessage> lqms = new ArrayList<LegacyQueueMessage>();
                                        lqms.add(message);
                                        messageMap.put(applicationId, lqms);
                                    } else {
                                        messageMap.get(applicationId).add(message);
                                    }
                                }

                                long now = System.currentTimeMillis();
                                Observable merge = null;

                                //send each set of app ids together
                                for (Map.Entry<UUID, List<LegacyQueueMessage>> entry : messageMap.entrySet()) {
                                    UUID applicationId = entry.getKey();

                                    ApplicationQueueManager manager = applicationQueueManagerCache
                                        .getApplicationQueueManager(
                                            emf.getEntityManager(applicationId),
                                            legacyQueueManager,
                                            new JobScheduler(smf.getServiceManager(applicationId),
                                                             emf.getEntityManager(applicationId)),
                                            metricsService,
                                            properties
                                        );

                                    if (logger.isTraceEnabled()) {
                                        logger.trace("send batch for app {} of {} messages",
                                            entry.getKey(), entry.getValue().size());
                                    }
                                    Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);

                                    if(merge == null)
                                        merge = current;
                                    else {
                                        merge = Observable.merge(merge,current);
                                    }
                                }

                                if(merge!=null) {
                                    merge.toBlocking().lastOrDefault(null);
                                }
                                legacyQueueManager.commitMessages(messages);

                                meter.mark(messages.size());
                                if (logger.isTraceEnabled()) {
                                    logger.trace("sent batch {} messages duration {} ms",
                                        messages.size(), System.currentTimeMillis() - now);
                                }


                                if(runCount.incrementAndGet() % consecutiveCallsToRemoveDevices == 0){
                                    for(ApplicationQueueManager aqm : applicationQueueManagerCache.asMap().values()){
                                        try {
                                            aqm.asyncCheckForInactiveDevices();
                                        }catch (Exception inactiveDeviceException){
                                            logger.error("Inactive Device Get failed",inactiveDeviceException);
                                        }
                                    }
                                    //clear everything
                                    runCount.set(0);
                                }
                            }

                            else{
                                if (logger.isTraceEnabled()) {
                                    logger.trace("no messages...sleep...{}", sleepWhenNoneFound);
                                }
                                try {
                                    Thread.sleep(sleepWhenNoneFound);
                                } catch (InterruptedException e){
                                    // noop
                                }
                            }
                            timerContext.stop();
                            //send to the providers
                            consecutiveExceptions.set(0);
                        } catch (Exception ex){
                            logger.error("failed to dequeue",ex);

                            // clear the queue name cache b/c tests might have wiped the keyspace
                            legacyQueueManager.clearQueueNameCache();
                            try {
                                long sleeptime = sleepWhenNoneFound*consecutiveExceptions.incrementAndGet();
                                long maxSleep = 15000;
                                sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime ;
                                logger.info("sleeping due to failures {} ms", sleeptime);
                                Thread.sleep(sleeptime);
                            }catch (InterruptedException ie){
                                if (logger.isTraceEnabled()) {
                                    logger.trace("sleep interrupted");
                                }
                            }
                        }
                    })
                    .toBlocking().lastOrDefault(null);


        }
    }