public Observable sendBatchToProviders()

in stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java [476:605]


    public Observable sendBatchToProviders(final List<LegacyQueueMessage> messages, final String queuePath) {
        if (logger.isTraceEnabled()) {
            logger.trace("sending batch of {} notifications.", messages.size());
        }

        final Map<Object, ProviderAdapter> notifierMap = getAdapterMap();
        final ApplicationQueueManagerImpl proxy = this;
        final ConcurrentHashMap<UUID, TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
        final ConcurrentHashMap<UUID, Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());

        final Func1<LegacyQueueMessage, ApplicationQueueMessage> func = queueMessage -> {
            boolean messageCommitted = false;
            ApplicationQueueMessage message = null;
            try {
                message = (ApplicationQueueMessage) queueMessage.getBody();
                if (logger.isTraceEnabled()) {
                    logger.trace("start sending notification for device {} for Notification: {} on thread {}", message.getDeviceId(), message.getNotificationId(), Thread.currentThread().getId());
                }

                UUID deviceUUID = message.getDeviceId();

                Notification notification = notificationMap.get(message.getNotificationId());
                if (notification == null) {
                    notification = em.get(message.getNotificationId(), Notification.class);
                    notificationMap.putIfAbsent(message.getNotificationId(), notification);
                }
                TaskManager taskManager = taskMap.get(message.getNotificationId());
                if (taskManager == null) {
                    taskManager = new TaskManager(em, notification);
                    taskMap.putIfAbsent(message.getNotificationId(), taskManager);
                    taskManager = taskMap.get(message.getNotificationId());
                }

                final Map<String, Object> payloads = notification.getPayloads();
                final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
                if (logger.isTraceEnabled()) {
                    logger.trace("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
                }

                try {
                    String notifierName = message.getNotifierKey().toLowerCase();
                    ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
                    Object payload = translatedPayloads.get(notifierName);

                    TaskTracker tracker = null;

                    if(notification.getSaveReceipts()){

                        final Receipt receipt =
                            new Receipt( notification.getUuid(), message.getNotifierId(), payload, deviceUUID );
                        tracker =
                            new TaskTracker( providerAdapter.getNotifier(), taskManager, receipt, deviceUUID );

                    }
                    else {

                        tracker =
                            new TaskTracker( providerAdapter.getNotifier(), taskManager, null, deviceUUID );
                    }
                    if (!isOkToSend(notification)) {
                        tracker.failed(0, "Notification is duplicate/expired/cancelled.");
                    } else {
                        if (payload == null) {
                            if (logger.isDebugEnabled()) {
                                logger.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
                            }
                            tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
                        } else {
                            long now = System.currentTimeMillis();
                            try {
                                providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
                            } catch (Exception e) {
                                tracker.failed(0, e.getMessage());
                            } finally {
                                if (logger.isTraceEnabled()) {
                                    logger.trace("sending to device {} for Notification: {} duration {} ms", deviceUUID, notification.getUuid(), (System.currentTimeMillis() - now));
                                }
                            }
                        }
                    }
                    messageCommitted = true;
                } finally {
                    sendMeter.mark();
                }

            } catch (Exception e) {
                logger.error("Failure while sending", e);
                try {
                    if (!messageCommitted && queuePath != null) {
                        qm.commitMessage(queueMessage);
                    }
                } catch (Exception queueException) {
                    logger.error("Failed to commit message.", queueException);
                }
            }
            return message;
        };

        //from each queue message, process them in parallel up to 10 at a time
        Observable queueMessageObservable = Observable.from(messages).flatMap(queueMessage -> {


            return Observable.just(queueMessage).map(func).buffer(messages.size()).map(queueMessages -> {
                //for gcm this will actually send notification
                for (ProviderAdapter providerAdapter : notifierMap.values()) {
                    try {
                        providerAdapter.doneSendingNotifications();
                    } catch (Exception e) {
                        logger.error("providerAdapter.doneSendingNotifications: ", e);
                    }
                }
                //TODO: check if a notification is done and mark it
                HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>();
                for (ApplicationQueueMessage message : queueMessages) {
                    if (notifications.get(message.getNotificationId()) == null) {
                        try {
                            TaskManager taskManager = taskMap.get(message.getNotificationId());
                            notifications.put(message.getNotificationId(), message);
                            taskManager.finishedBatch();
                        } catch (Exception e) {
                            logger.error("Failed to finish batch", e);
                        }
                    }
                }
                return notifications;
            }).doOnError(throwable -> logger.error("Failed while sending", throwable));
        }, 10);

        return queueMessageObservable;
    }