Set checkpointUpdate()

in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java [1813:2061]


    Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
        MDC.put("activemq.persistenceDir", getDirectory().getName());
        LOG.debug("Checkpoint started.");

        // reflect last update exclusive of current checkpoint
        Location lastUpdate = metadata.lastUpdate;

        metadata.state = OPEN_STATE;
        metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
        if (metadata.ackMessageFileMapDirtyFlag.get() || (metadata.ackMessageFileMapLocation == null)) {
            metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
        }
        metadata.ackMessageFileMapDirtyFlag.lazySet(false);
        Location[] inProgressTxRange = getInProgressTxLocationRange();
        metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
        tx.store(metadata.page, metadataMarshaller, true);

        final TreeSet<Integer> gcCandidateSet = new TreeSet<>();
        if (cleanup) {

            final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
            gcCandidateSet.addAll(completeFileSet);

            if (LOG.isTraceEnabled()) {
                LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
            }

            if (lastUpdate != null) {
                // we won't delete past the last update, ackCompaction journal can be a candidate in error
                gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId())));
            }

            // Don't GC files under replication
            if( journalFilesBeingReplicated!=null ) {
                gcCandidateSet.removeAll(journalFilesBeingReplicated);
            }

            if (metadata.producerSequenceIdTrackerLocation != null) {
                int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
                if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
                    // rewrite so we don't prevent gc
                    metadata.producerSequenceIdTracker.setModified(true);
                    if (LOG.isTraceEnabled()) {
                        LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
                    }
                }
                gcCandidateSet.remove(dataFileId);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet);
                }
            }

            if (metadata.ackMessageFileMapLocation != null) {
                int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
                gcCandidateSet.remove(dataFileId);
                if (LOG.isTraceEnabled()) {
                    LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet);
                }
            }

            // Don't GC files referenced by in-progress tx
            if (inProgressTxRange[0] != null) {
                for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
                    gcCandidateSet.remove(pendingTx);
                }
            }
            if (LOG.isTraceEnabled()) {
                LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
            }

            // Go through all the destinations to see if any of them can remove GC candidates.
            for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
                if( gcCandidateSet.isEmpty() ) {
                    break;
                }

                // Use a visitor to cut down the number of pages that we load
                entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
                    int last=-1;
                    @Override
                    public boolean isInterestedInKeysBetween(Location first, Location second) {
                        if( first==null ) {
                            SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
                                subset.remove(second.getDataFileId());
                            }
                            return !subset.isEmpty();
                        } else if( second==null ) {
                            SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
                                subset.remove(first.getDataFileId());
                            }
                            return !subset.isEmpty();
                        } else {
                            SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
                            if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
                                subset.remove(first.getDataFileId());
                            }
                            if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
                                subset.remove(second.getDataFileId());
                            }
                            return !subset.isEmpty();
                        }
                    }

                    @Override
                    public void visit(List<Location> keys, List<Long> values) {
                        for (Location l : keys) {
                            int fileId = l.getDataFileId();
                            if( last != fileId ) {
                                gcCandidateSet.remove(fileId);
                                last = fileId;
                            }
                        }
                    }
                });

                // Durable Subscription
                if (entry.getValue().subLocations != null) {
                    Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
                    while (iter.hasNext()) {
                        Entry<String, Location> subscription = iter.next();
                        int dataFileId = subscription.getValue().getDataFileId();

                        // Move subscription along if it has no outstanding messages that need ack'd
                        // and its in the last log file in the journal.
                        if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
                            final StoredDestination destination = entry.getValue();
                            final String subscriptionKey = subscription.getKey();
                            SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);

                            // When pending is size one that is the next message Id meaning there
                            // are no pending messages currently.
                            if (pendingAcks == null || pendingAcks.isEmpty() ||
                                (pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {

                                if (LOG.isTraceEnabled()) {
                                    LOG.trace("Found candidate for rewrite: sub {} on {} from file {}", subscriptionKey, entry.getKey(), dataFileId);
                                }

                                final KahaSubscriptionCommand kahaSub =
                                    destination.subscriptions.get(tx, subscriptionKey);
                                destination.subLocations.put(
                                    tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));

                                // Skips the remove from candidates if we rewrote the subscription
                                // in order to prevent duplicate subscription commands on recover.
                                // If another subscription is on the same file and isn't rewritten
                                // than it will remove the file from the set.
                                continue;
                            }
                        }

                        if (LOG.isTraceEnabled()) {
                            final StoredDestination destination = entry.getValue();
                            final String subscriptionKey = subscription.getKey();
                            final SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
                            LOG.trace("sub {} on {} in dataFile {} has pendingCount {}", subscriptionKey, entry.getKey(), dataFileId, pendingAcks.rangeSize()-1);
                        }
                        gcCandidateSet.remove(dataFileId);
                    }
                }

                if (LOG.isTraceEnabled()) {
                    LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
                }
            }

            // check we are not deleting file with ack for in-use journal files
            if (LOG.isTraceEnabled()) {
                LOG.trace("gc candidates: " + gcCandidateSet);
                LOG.trace("ackMessageFileMap: " +  metadata.ackMessageFileMap);
            }

            boolean ackMessageFileMapMod = false;
            Iterator<Integer> candidates = gcCandidateSet.iterator();
            while (candidates.hasNext()) {
                Integer candidate = candidates.next();
                Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
                if (referencedFileIds != null) {
                    for (Integer referencedFileId : referencedFileIds) {
                        if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
                            // active file that is not targeted for deletion is referenced so don't delete
                            candidates.remove();
                            break;
                        }
                    }
                    if (gcCandidateSet.contains(candidate)) {
                        ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
                        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
                    } else {
                        if (LOG.isTraceEnabled()) {
                            LOG.trace("not removing data file: " + candidate
                                    + " as contained ack(s) refer to referenced file: " + referencedFileIds);
                        }
                    }
                }
            }

            if (!gcCandidateSet.isEmpty()) {
                LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
                for (Integer candidate : gcCandidateSet) {
                    for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
                        ackMessageFileMapMod |= ackFiles.remove(candidate);
                        metadata.ackMessageFileMapDirtyFlag.lazySet(true);
                    }
                }
                if (ackMessageFileMapMod) {
                    checkpointUpdate(tx, false);
                }
            } else if (isEnableAckCompaction()) {
                if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
                    // First check length of journal to make sure it makes sense to even try.
                    //
                    // If there is only one journal file with Acks in it we don't need to move
                    // it since it won't be chained to any later logs.
                    //
                    // If the logs haven't grown since the last time then we need to compact
                    // otherwise there seems to still be room for growth and we don't need to incur
                    // the overhead.  Depending on configuration this check can be avoided and
                    // Ack compaction will run any time the store has not GC'd a journal file in
                    // the configured amount of cycles.
                    if (metadata.ackMessageFileMap.size() > 1 &&
                        (journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {

                        LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
                        try {
                            scheduler.execute(new AckCompactionRunner());
                        } catch (Exception ex) {
                            LOG.warn("Error on queueing the Ack Compactor", ex);
                        }
                    } else {
                        LOG.trace("Journal activity detected, no Ack compaction scheduled.");
                    }

                    checkPointCyclesWithNoGC = 0;
                } else {
                    LOG.trace("Not yet time to check for compaction: {} of {} cycles",
                              checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
                }

                journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
            }
        }
        MDC.remove("activemq.persistenceDir");

        LOG.debug("Checkpoint done.");
        return gcCandidateSet;
    }