in stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java [476:605]
public Observable sendBatchToProviders(final List<LegacyQueueMessage> messages, final String queuePath) {
if (logger.isTraceEnabled()) {
logger.trace("sending batch of {} notifications.", messages.size());
}
final Map<Object, ProviderAdapter> notifierMap = getAdapterMap();
final ApplicationQueueManagerImpl proxy = this;
final ConcurrentHashMap<UUID, TaskManager> taskMap = new ConcurrentHashMap<UUID, TaskManager>(messages.size());
final ConcurrentHashMap<UUID, Notification> notificationMap = new ConcurrentHashMap<UUID, Notification>(messages.size());
final Func1<LegacyQueueMessage, ApplicationQueueMessage> func = queueMessage -> {
boolean messageCommitted = false;
ApplicationQueueMessage message = null;
try {
message = (ApplicationQueueMessage) queueMessage.getBody();
if (logger.isTraceEnabled()) {
logger.trace("start sending notification for device {} for Notification: {} on thread {}", message.getDeviceId(), message.getNotificationId(), Thread.currentThread().getId());
}
UUID deviceUUID = message.getDeviceId();
Notification notification = notificationMap.get(message.getNotificationId());
if (notification == null) {
notification = em.get(message.getNotificationId(), Notification.class);
notificationMap.putIfAbsent(message.getNotificationId(), notification);
}
TaskManager taskManager = taskMap.get(message.getNotificationId());
if (taskManager == null) {
taskManager = new TaskManager(em, notification);
taskMap.putIfAbsent(message.getNotificationId(), taskManager);
taskManager = taskMap.get(message.getNotificationId());
}
final Map<String, Object> payloads = notification.getPayloads();
final Map<String, Object> translatedPayloads = translatePayloads(payloads, notifierMap);
if (logger.isTraceEnabled()) {
logger.trace("sending notification for device {} for Notification: {}", deviceUUID, notification.getUuid());
}
try {
String notifierName = message.getNotifierKey().toLowerCase();
ProviderAdapter providerAdapter = notifierMap.get(notifierName.toLowerCase());
Object payload = translatedPayloads.get(notifierName);
TaskTracker tracker = null;
if(notification.getSaveReceipts()){
final Receipt receipt =
new Receipt( notification.getUuid(), message.getNotifierId(), payload, deviceUUID );
tracker =
new TaskTracker( providerAdapter.getNotifier(), taskManager, receipt, deviceUUID );
}
else {
tracker =
new TaskTracker( providerAdapter.getNotifier(), taskManager, null, deviceUUID );
}
if (!isOkToSend(notification)) {
tracker.failed(0, "Notification is duplicate/expired/cancelled.");
} else {
if (payload == null) {
if (logger.isDebugEnabled()) {
logger.debug("selected device {} for notification {} doesn't have a valid payload. skipping.", deviceUUID, notification.getUuid());
}
tracker.failed(0, "failed to match payload to " + message.getNotifierId() + " notifier");
} else {
long now = System.currentTimeMillis();
try {
providerAdapter.sendNotification(message.getNotifierId(), payload, notification, tracker);
} catch (Exception e) {
tracker.failed(0, e.getMessage());
} finally {
if (logger.isTraceEnabled()) {
logger.trace("sending to device {} for Notification: {} duration {} ms", deviceUUID, notification.getUuid(), (System.currentTimeMillis() - now));
}
}
}
}
messageCommitted = true;
} finally {
sendMeter.mark();
}
} catch (Exception e) {
logger.error("Failure while sending", e);
try {
if (!messageCommitted && queuePath != null) {
qm.commitMessage(queueMessage);
}
} catch (Exception queueException) {
logger.error("Failed to commit message.", queueException);
}
}
return message;
};
//from each queue message, process them in parallel up to 10 at a time
Observable queueMessageObservable = Observable.from(messages).flatMap(queueMessage -> {
return Observable.just(queueMessage).map(func).buffer(messages.size()).map(queueMessages -> {
//for gcm this will actually send notification
for (ProviderAdapter providerAdapter : notifierMap.values()) {
try {
providerAdapter.doneSendingNotifications();
} catch (Exception e) {
logger.error("providerAdapter.doneSendingNotifications: ", e);
}
}
//TODO: check if a notification is done and mark it
HashMap<UUID, ApplicationQueueMessage> notifications = new HashMap<>();
for (ApplicationQueueMessage message : queueMessages) {
if (notifications.get(message.getNotificationId()) == null) {
try {
TaskManager taskManager = taskMap.get(message.getNotificationId());
notifications.put(message.getNotificationId(), message);
taskManager.finishedBatch();
} catch (Exception e) {
logger.error("Failed to finish batch", e);
}
}
}
return notifications;
}).doOnError(throwable -> logger.error("Failed while sending", throwable));
}, 10);
return queueMessageObservable;
}