public void queueNotification()

in stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java [141:432]


    public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
        if (scheduleQueueJob(notification)) {
            em.update(notification);
            return;
        }

        long startTime = System.currentTimeMillis();

        if (notification.getCanceled() == Boolean.TRUE) {
            if (logger.isDebugEnabled()) {
                logger.debug("notification " + notification.getUuid() + " canceled");
            }
            if (jobExecution != null) {
                jobExecution.killed();
            }
            return;
        }

        final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery(); //devices query
        final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
        final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues

        // Get devices in querystring, and make sure you have access
        if (pathQuery != null) {
            final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap();
            if (logger.isTraceEnabled()) {
                logger.trace("notification {} start query", notification.getUuid());
            }

            if(logger.isTraceEnabled()) {
                logger.trace("Notification {} started processing", notification.getUuid());
            }



            // The main iterator can use graph traversal or index querying based on payload property. Default is Index.
            final Iterator<Device> iterator;
            if( notification.getUseGraph()){
                iterator = pathQuery.graphIterator(em);
            }else{
                iterator = pathQuery.iterator(em);
            }

            /**** Old code to scheduler large sets of data, but now the processing is fired off async in the background.
                Leaving this only a reference of how to do it, if needed in future.

                    //if there are more pages (defined by PAGE_SIZE) you probably want this to be async,
                    //also if this is already a job then don't reschedule

                    if (iterator instanceof ResultsIterator
                                && ((ResultsIterator) iterator).hasPages() && jobExecution == null) {

                        if(logger.isTraceEnabled()){
                            logger.trace("Scheduling notification job as it has multiple pages of devices.");
                        }
                        jobScheduler.scheduleQueueJob(notification, true);
                        em.update(notification);
                        return;
                     }
             ****/

            final UUID appId = em.getApplication().getUuid();
            final Map<String, Object> payloads = notification.getPayloads();

            final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> {

                try {

                    //logger.info("Preparing notification queue message for device: {}", deviceRef.getUuid());

                    long now = System.currentTimeMillis();

                    String notifierId = null;
                    String notifierKey = null;

                    //find the device notifier info, match it to the payload
                    for (Map.Entry<String, Object> entry : payloads.entrySet()) {
                        ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
                        now = System.currentTimeMillis();
                        String providerId = getProviderId(deviceRef, adapter.getNotifier());
                        if (providerId != null) {
                            notifierId = providerId;
                            notifierKey = entry.getKey().toLowerCase();
                            break;
                        }
                        if (logger.isTraceEnabled()) {
                            logger.trace("Provider query for notification {} device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now));
                        }
                    }

                    if (notifierId == null) {
                        //TODO need to leverage optional here
                        return Optional.empty();
                    }

                    ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
                    if (notification.getQueued() == null) {

                        // update queued time
                        notification.setQueued(System.currentTimeMillis());

                    }

                    deviceCount.incrementAndGet();

                    return Optional.of(message);


                } catch (Exception deviceLoopException) {
                    logger.error("Failed to add device", deviceLoopException);
                    errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException);

                    return Optional.empty();
                }

            };


            final Map<String, Object> filters = notification.getFilters();

            Observable processMessagesObservable = Observable.create(new IteratorObservable<EntityRef>(iterator))

                .flatMap( entityRef -> {

                    return Observable.just(entityRef).flatMap(ref->{

                        List<Entity> entities = new ArrayList<>();

                            if( ref.getType().equals(User.ENTITY_TYPE)){

                                Query devicesQuery = new Query();
                                devicesQuery.setCollection("devices");
                                devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
                                devicesQuery.setLimit(50); // for now, assume a user has no more than 50 devices

                                try {

                                   entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();

                                }catch (Exception e){

                                    logger.error("Unable to load devices for user: {}", ref.getUuid());
                                    return Observable.empty();
                                }


                            }else if ( ref.getType().equals(Device.ENTITY_TYPE)){

                                try{
                                    entities.add(em.get(ref));

                                }catch(Exception e){

                                    logger.error("Unable to load device: {}", ref.getUuid());
                                    return Observable.empty();

                                }

                            }
                        return Observable.from(entities);

                        })
                        .filter( device -> {

                            if(logger.isTraceEnabled()) {
                                logger.trace("Filtering device: {}", device.getUuid());
                            }

                            if(notification.getUseGraph() && filters.size() > 0 ) {

                                for (Map.Entry<String, Object> entry : filters.entrySet()) {

                                    if ((device.getDynamicProperties().get(entry.getKey()) != null &&
                                        device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) ||

                                        (device.getProperties().get(entry.getKey()) != null &&
                                            device.getProperties().get(entry.getKey()).equals(entry.getValue()))

                                        ) {


                                        return true;
                                    }

                                }
                                if(logger.isTraceEnabled()) {
                                    logger.trace("Push notification filter did not match for notification {}, so removing from notification",
                                        device.getUuid(), notification.getUuid());
                                }
                                return false;


                            }

                            return true;

                        })
                        .map(sendMessageFunction)
                        .subscribeOn(Schedulers.io());

                }, concurrencyFactor)
                .distinct( queueMessage -> {

                    if(queueMessage.isPresent()) {
                        return queueMessage.get().getDeviceId();
                    }

                    return UUIDUtils.newTimeUUID(); // this should be distinct, default handling for the Optional.empty() case

                } )
                .doOnNext( message -> {
                    try {

                        if(message.isPresent()){

                            if(logger.isTraceEnabled()) {
                                logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
                            }
                            qm.sendMessageToLocalRegion( message.get(), Boolean.TRUE );
                            queueMeter.mark();
                        }

                    } catch (Exception e) {

                        if(message.isPresent()){
                            logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
                                message.get().getNotificationId(), message.get().getDeviceId());
                        }
                        else{
                            logger.error("Unable to queue notification as it's not present when trying to send to queue");
                        }

                    }


                })
                .doOnError(throwable -> {

                    logger.error("Error while processing devices for notification : {}, error: {}", notification.getUuid(), throwable.getMessage());
                    notification.setProcessingFinished(-1L);
                    notification.setDeviceProcessedCount(deviceCount.get());
                    logger.warn("Partial notification. Only {} devices processed for notification {}",
                        deviceCount.get(), notification.getUuid());
                    try {
                        em.update(notification);
                    }catch (Exception e){
                        logger.error("Error updating negative processing status when processing failed.");
                    }

                })
                .doOnCompleted( () -> {

                    try {
                        notification.setProcessingFinished(System.currentTimeMillis());
                        notification.setDeviceProcessedCount(deviceCount.get());
                        em.update(notification);
                        if(logger.isTraceEnabled()) {
                            logger.trace("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get());
                        }

                    } catch (Exception e) {
                        logger.error("Unable to set processing finished timestamp for notification");
                    }

                });

            processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background

        }

        // update queued time
        Map<String, Object> properties = new HashMap<>(2);
        properties.put("queued", notification.getQueued());
        properties.put("state", notification.getState());
        if (errorMessages.size() > 0) {
            if (notification.getErrorMessage() == null) {
                notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
            }
        }

        notification.addProperties(properties);
        em.update(notification);


        // if no devices, go ahead and mark the batch finished
        if (deviceCount.get() <= 0 ) {
            TaskManager taskManager = new TaskManager(em, notification);
            taskManager.finishedBatch();
        }


    }