in stack/services/src/main/java/org/apache/usergrid/services/notifications/impl/ApplicationQueueManagerImpl.java [141:432]
public void queueNotification(final Notification notification, final JobExecution jobExecution) throws Exception {
if (scheduleQueueJob(notification)) {
em.update(notification);
return;
}
long startTime = System.currentTimeMillis();
if (notification.getCanceled() == Boolean.TRUE) {
if (logger.isDebugEnabled()) {
logger.debug("notification " + notification.getUuid() + " canceled");
}
if (jobExecution != null) {
jobExecution.killed();
}
return;
}
final PathQuery<Device> pathQuery = notification.getPathQuery().buildPathQuery(); //devices query
final AtomicInteger deviceCount = new AtomicInteger(); //count devices so you can make a judgement on batching
final ConcurrentLinkedQueue<String> errorMessages = new ConcurrentLinkedQueue<>(); //build up list of issues
// Get devices in querystring, and make sure you have access
if (pathQuery != null) {
final HashMap<Object, ProviderAdapter> notifierMap = getAdapterMap();
if (logger.isTraceEnabled()) {
logger.trace("notification {} start query", notification.getUuid());
}
if(logger.isTraceEnabled()) {
logger.trace("Notification {} started processing", notification.getUuid());
}
// The main iterator can use graph traversal or index querying based on payload property. Default is Index.
final Iterator<Device> iterator;
if( notification.getUseGraph()){
iterator = pathQuery.graphIterator(em);
}else{
iterator = pathQuery.iterator(em);
}
/**** Old code to scheduler large sets of data, but now the processing is fired off async in the background.
Leaving this only a reference of how to do it, if needed in future.
//if there are more pages (defined by PAGE_SIZE) you probably want this to be async,
//also if this is already a job then don't reschedule
if (iterator instanceof ResultsIterator
&& ((ResultsIterator) iterator).hasPages() && jobExecution == null) {
if(logger.isTraceEnabled()){
logger.trace("Scheduling notification job as it has multiple pages of devices.");
}
jobScheduler.scheduleQueueJob(notification, true);
em.update(notification);
return;
}
****/
final UUID appId = em.getApplication().getUuid();
final Map<String, Object> payloads = notification.getPayloads();
final Func1<EntityRef, Optional<ApplicationQueueMessage>> sendMessageFunction = deviceRef -> {
try {
//logger.info("Preparing notification queue message for device: {}", deviceRef.getUuid());
long now = System.currentTimeMillis();
String notifierId = null;
String notifierKey = null;
//find the device notifier info, match it to the payload
for (Map.Entry<String, Object> entry : payloads.entrySet()) {
ProviderAdapter adapter = notifierMap.get(entry.getKey().toLowerCase());
now = System.currentTimeMillis();
String providerId = getProviderId(deviceRef, adapter.getNotifier());
if (providerId != null) {
notifierId = providerId;
notifierKey = entry.getKey().toLowerCase();
break;
}
if (logger.isTraceEnabled()) {
logger.trace("Provider query for notification {} device {} took {} ms", notification.getUuid(), deviceRef.getUuid(), (System.currentTimeMillis() - now));
}
}
if (notifierId == null) {
//TODO need to leverage optional here
return Optional.empty();
}
ApplicationQueueMessage message = new ApplicationQueueMessage(appId, notification.getUuid(), deviceRef.getUuid(), notifierKey, notifierId);
if (notification.getQueued() == null) {
// update queued time
notification.setQueued(System.currentTimeMillis());
}
deviceCount.incrementAndGet();
return Optional.of(message);
} catch (Exception deviceLoopException) {
logger.error("Failed to add device", deviceLoopException);
errorMessages.add("Failed to add device: " + deviceRef.getUuid() + ", error:" + deviceLoopException);
return Optional.empty();
}
};
final Map<String, Object> filters = notification.getFilters();
Observable processMessagesObservable = Observable.create(new IteratorObservable<EntityRef>(iterator))
.flatMap( entityRef -> {
return Observable.just(entityRef).flatMap(ref->{
List<Entity> entities = new ArrayList<>();
if( ref.getType().equals(User.ENTITY_TYPE)){
Query devicesQuery = new Query();
devicesQuery.setCollection("devices");
devicesQuery.setResultsLevel(Query.Level.CORE_PROPERTIES);
devicesQuery.setLimit(50); // for now, assume a user has no more than 50 devices
try {
entities = em.searchCollection(new SimpleEntityRef("user", ref.getUuid()), devicesQuery.getCollection(), devicesQuery).getEntities();
}catch (Exception e){
logger.error("Unable to load devices for user: {}", ref.getUuid());
return Observable.empty();
}
}else if ( ref.getType().equals(Device.ENTITY_TYPE)){
try{
entities.add(em.get(ref));
}catch(Exception e){
logger.error("Unable to load device: {}", ref.getUuid());
return Observable.empty();
}
}
return Observable.from(entities);
})
.filter( device -> {
if(logger.isTraceEnabled()) {
logger.trace("Filtering device: {}", device.getUuid());
}
if(notification.getUseGraph() && filters.size() > 0 ) {
for (Map.Entry<String, Object> entry : filters.entrySet()) {
if ((device.getDynamicProperties().get(entry.getKey()) != null &&
device.getDynamicProperties().get(entry.getKey()).equals(entry.getValue())) ||
(device.getProperties().get(entry.getKey()) != null &&
device.getProperties().get(entry.getKey()).equals(entry.getValue()))
) {
return true;
}
}
if(logger.isTraceEnabled()) {
logger.trace("Push notification filter did not match for notification {}, so removing from notification",
device.getUuid(), notification.getUuid());
}
return false;
}
return true;
})
.map(sendMessageFunction)
.subscribeOn(Schedulers.io());
}, concurrencyFactor)
.distinct( queueMessage -> {
if(queueMessage.isPresent()) {
return queueMessage.get().getDeviceId();
}
return UUIDUtils.newTimeUUID(); // this should be distinct, default handling for the Optional.empty() case
} )
.doOnNext( message -> {
try {
if(message.isPresent()){
if(logger.isTraceEnabled()) {
logger.trace("Queueing notification message for device: {}", message.get().getDeviceId());
}
qm.sendMessageToLocalRegion( message.get(), Boolean.TRUE );
queueMeter.mark();
}
} catch (Exception e) {
if(message.isPresent()){
logger.error("Unable to queue notification for notification UUID {} and device UUID {} ",
message.get().getNotificationId(), message.get().getDeviceId());
}
else{
logger.error("Unable to queue notification as it's not present when trying to send to queue");
}
}
})
.doOnError(throwable -> {
logger.error("Error while processing devices for notification : {}, error: {}", notification.getUuid(), throwable.getMessage());
notification.setProcessingFinished(-1L);
notification.setDeviceProcessedCount(deviceCount.get());
logger.warn("Partial notification. Only {} devices processed for notification {}",
deviceCount.get(), notification.getUuid());
try {
em.update(notification);
}catch (Exception e){
logger.error("Error updating negative processing status when processing failed.");
}
})
.doOnCompleted( () -> {
try {
notification.setProcessingFinished(System.currentTimeMillis());
notification.setDeviceProcessedCount(deviceCount.get());
em.update(notification);
if(logger.isTraceEnabled()) {
logger.trace("Notification {} finished processing {} device(s)", notification.getUuid(), deviceCount.get());
}
} catch (Exception e) {
logger.error("Unable to set processing finished timestamp for notification");
}
});
processMessagesObservable.subscribeOn(Schedulers.io()).subscribe(); // fire the queuing into the background
}
// update queued time
Map<String, Object> properties = new HashMap<>(2);
properties.put("queued", notification.getQueued());
properties.put("state", notification.getState());
if (errorMessages.size() > 0) {
if (notification.getErrorMessage() == null) {
notification.setErrorMessage("There was a problem delivering all of your notifications. See deliveryErrors in properties");
}
}
notification.addProperties(properties);
em.update(notification);
// if no devices, go ahead and mark the batch finished
if (deviceCount.get() <= 0 ) {
TaskManager taskManager = new TaskManager(em, notification);
taskManager.finishedBatch();
}
}