in stack/services/src/main/java/org/apache/usergrid/services/notifications/QueueListener.java [146:303]
private void execute(int threadNumber){
if(Thread.currentThread().isDaemon()) {
Thread.currentThread().setDaemon(true);
}
Thread.currentThread().setName(getClass().getSimpleName()+"_Push-"+ RandomStringUtils.randomAlphanumeric(4)+"-"+threadNumber);
final AtomicInteger consecutiveExceptions = new AtomicInteger();
if (logger.isTraceEnabled()) {
logger.trace("QueueListener: Starting execute process.");
}
Meter meter = metricsService.getMeter(QueueListener.class, "execute.commit");
com.codahale.metrics.Timer timer = metricsService.getTimer(QueueListener.class, "execute.dequeue");
if (logger.isTraceEnabled()) {
logger.trace("getting from queue {} ", queueName);
}
LegacyQueueScope queueScope = new LegacyQueueScopeImpl( queueName, LegacyQueueScope.RegionImplementation.LOCAL);
LegacyQueueManager legacyQueueManager = queueManagerFactory.getQueueManager(queueScope);
// run until there are no more active jobs
final AtomicLong runCount = new AtomicLong(0);
while ( true ) {
if(sleepBetweenRuns > 0) {
if (logger.isTraceEnabled()) {
logger.trace("sleep between rounds...sleep...{}", sleepBetweenRuns);
}
try { Thread.sleep(sleepBetweenRuns); } catch (InterruptedException ignored) { }
}
Timer.Context timerContext = timer.time();
rx.Observable.from( legacyQueueManager.getMessages(MAX_TAKE, ApplicationQueueMessage.class))
.buffer(MAX_TAKE)
.doOnNext(messages -> {
try {
if (logger.isTraceEnabled()) {
logger.trace("retrieved batch of {} messages from queue {}",messages.size(),queueName);
}
if (messages.size() > 0) {
HashMap<UUID, List<LegacyQueueMessage>> messageMap = new HashMap<>(messages.size());
//group messages into hash map by app id
for (LegacyQueueMessage message : messages) {
//TODO: stop copying around this area as it gets notification specific.
ApplicationQueueMessage queueMessage = (ApplicationQueueMessage) message.getBody();
UUID applicationId = queueMessage.getApplicationId();
// Groups queue messages by application Id,
// (they are all probably going to the same place)
if (!messageMap.containsKey(applicationId)) {
//For each app id it sends the set.
List<LegacyQueueMessage> lqms = new ArrayList<LegacyQueueMessage>();
lqms.add(message);
messageMap.put(applicationId, lqms);
} else {
messageMap.get(applicationId).add(message);
}
}
long now = System.currentTimeMillis();
Observable merge = null;
//send each set of app ids together
for (Map.Entry<UUID, List<LegacyQueueMessage>> entry : messageMap.entrySet()) {
UUID applicationId = entry.getKey();
ApplicationQueueManager manager = applicationQueueManagerCache
.getApplicationQueueManager(
emf.getEntityManager(applicationId),
legacyQueueManager,
new JobScheduler(smf.getServiceManager(applicationId),
emf.getEntityManager(applicationId)),
metricsService,
properties
);
if (logger.isTraceEnabled()) {
logger.trace("send batch for app {} of {} messages",
entry.getKey(), entry.getValue().size());
}
Observable current = manager.sendBatchToProviders(entry.getValue(),queueName);
if(merge == null)
merge = current;
else {
merge = Observable.merge(merge,current);
}
}
if(merge!=null) {
merge.toBlocking().lastOrDefault(null);
}
legacyQueueManager.commitMessages(messages);
meter.mark(messages.size());
if (logger.isTraceEnabled()) {
logger.trace("sent batch {} messages duration {} ms",
messages.size(), System.currentTimeMillis() - now);
}
if(runCount.incrementAndGet() % consecutiveCallsToRemoveDevices == 0){
for(ApplicationQueueManager aqm : applicationQueueManagerCache.asMap().values()){
try {
aqm.asyncCheckForInactiveDevices();
}catch (Exception inactiveDeviceException){
logger.error("Inactive Device Get failed",inactiveDeviceException);
}
}
//clear everything
runCount.set(0);
}
}
else{
if (logger.isTraceEnabled()) {
logger.trace("no messages...sleep...{}", sleepWhenNoneFound);
}
try {
Thread.sleep(sleepWhenNoneFound);
} catch (InterruptedException e){
// noop
}
}
timerContext.stop();
//send to the providers
consecutiveExceptions.set(0);
} catch (Exception ex){
logger.error("failed to dequeue",ex);
// clear the queue name cache b/c tests might have wiped the keyspace
legacyQueueManager.clearQueueNameCache();
try {
long sleeptime = sleepWhenNoneFound*consecutiveExceptions.incrementAndGet();
long maxSleep = 15000;
sleeptime = sleeptime > maxSleep ? maxSleep : sleeptime ;
logger.info("sleeping due to failures {} ms", sleeptime);
Thread.sleep(sleeptime);
}catch (InterruptedException ie){
if (logger.isTraceEnabled()) {
logger.trace("sleep interrupted");
}
}
}
})
.toBlocking().lastOrDefault(null);
}
}