public static List getUnackedSubscriptions()

in activemq-kahadb-exporter/src/main/java/org/apache/activemq/store/kahadb/KahaDBUtil.java [41:82]


    public static List<SubscriptionInfo> getUnackedSubscriptions(KahaDBStore store, Message message)
            throws Exception {

        final List<SubscriptionInfo> matching = new ArrayList<>();

        if (!message.getDestination().isTopic()) {
            return matching;
        }

        ActiveMQTopic topic = (ActiveMQTopic) message.getDestination();
        String messageId = message.getMessageId().toString();
        TopicMessageStore messageStore =  store.createTopicMessageStore(topic);

        store.indexLock.writeLock().lock();

        final SubscriptionInfo[] infos = messageStore.getAllSubscriptions();

        try {
            store.pageFile.tx().execute(new Transaction.Closure<Exception>() {
                @Override
                public void execute(Transaction tx) throws Exception {
                    StoredDestination sd = store.getStoredDestination(store.convert(topic), tx);

                    if (sd != null) {
                        Long position = sd.messageIdIndex.get(tx, messageId);

                        for (SubscriptionInfo info : infos) {
                            LastAck cursorPos = store.getLastAck(tx, sd,
                                    store.subscriptionKey(info.getClientId(), info.getSubcriptionName()));
                            if (cursorPos.lastAckedSequence < position) {
                                matching.add(info);
                            }
                        }
                    }
                }
            });
        } finally {
            store.indexLock.writeLock().unlock();
        }

        return matching;
    }