in activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java [1813:2061]
Set<Integer> checkpointUpdate(Transaction tx, boolean cleanup) throws IOException {
MDC.put("activemq.persistenceDir", getDirectory().getName());
LOG.debug("Checkpoint started.");
// reflect last update exclusive of current checkpoint
Location lastUpdate = metadata.lastUpdate;
metadata.state = OPEN_STATE;
metadata.producerSequenceIdTrackerLocation = checkpointProducerAudit();
if (metadata.ackMessageFileMapDirtyFlag.get() || (metadata.ackMessageFileMapLocation == null)) {
metadata.ackMessageFileMapLocation = checkpointAckMessageFileMap();
}
metadata.ackMessageFileMapDirtyFlag.lazySet(false);
Location[] inProgressTxRange = getInProgressTxLocationRange();
metadata.firstInProgressTransactionLocation = inProgressTxRange[0];
tx.store(metadata.page, metadataMarshaller, true);
final TreeSet<Integer> gcCandidateSet = new TreeSet<>();
if (cleanup) {
final TreeSet<Integer> completeFileSet = new TreeSet<>(journal.getFileMap().keySet());
gcCandidateSet.addAll(completeFileSet);
if (LOG.isTraceEnabled()) {
LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet);
}
if (lastUpdate != null) {
// we won't delete past the last update, ackCompaction journal can be a candidate in error
gcCandidateSet.removeAll(new TreeSet<Integer>(gcCandidateSet.tailSet(lastUpdate.getDataFileId())));
}
// Don't GC files under replication
if( journalFilesBeingReplicated!=null ) {
gcCandidateSet.removeAll(journalFilesBeingReplicated);
}
if (metadata.producerSequenceIdTrackerLocation != null) {
int dataFileId = metadata.producerSequenceIdTrackerLocation.getDataFileId();
if (gcCandidateSet.contains(dataFileId) && gcCandidateSet.first() == dataFileId) {
// rewrite so we don't prevent gc
metadata.producerSequenceIdTracker.setModified(true);
if (LOG.isTraceEnabled()) {
LOG.trace("rewriting producerSequenceIdTracker:" + metadata.producerSequenceIdTrackerLocation);
}
}
gcCandidateSet.remove(dataFileId);
if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates after producerSequenceIdTrackerLocation:" + metadata.producerSequenceIdTrackerLocation + ", " + gcCandidateSet);
}
}
if (metadata.ackMessageFileMapLocation != null) {
int dataFileId = metadata.ackMessageFileMapLocation.getDataFileId();
gcCandidateSet.remove(dataFileId);
if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates after ackMessageFileMapLocation:" + metadata.ackMessageFileMapLocation + ", " + gcCandidateSet);
}
}
// Don't GC files referenced by in-progress tx
if (inProgressTxRange[0] != null) {
for (int pendingTx=inProgressTxRange[0].getDataFileId(); pendingTx <= inProgressTxRange[1].getDataFileId(); pendingTx++) {
gcCandidateSet.remove(pendingTx);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates after in progress tx range:" + Arrays.asList(inProgressTxRange) + ", " + gcCandidateSet);
}
// Go through all the destinations to see if any of them can remove GC candidates.
for (Entry<String, StoredDestination> entry : storedDestinations.entrySet()) {
if( gcCandidateSet.isEmpty() ) {
break;
}
// Use a visitor to cut down the number of pages that we load
entry.getValue().locationIndex.visit(tx, new BTreeVisitor<Location, Long>() {
int last=-1;
@Override
public boolean isInterestedInKeysBetween(Location first, Location second) {
if( first==null ) {
SortedSet<Integer> subset = gcCandidateSet.headSet(second.getDataFileId()+1);
if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
subset.remove(second.getDataFileId());
}
return !subset.isEmpty();
} else if( second==null ) {
SortedSet<Integer> subset = gcCandidateSet.tailSet(first.getDataFileId());
if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
subset.remove(first.getDataFileId());
}
return !subset.isEmpty();
} else {
SortedSet<Integer> subset = gcCandidateSet.subSet(first.getDataFileId(), second.getDataFileId()+1);
if( !subset.isEmpty() && subset.first() == first.getDataFileId() ) {
subset.remove(first.getDataFileId());
}
if( !subset.isEmpty() && subset.last() == second.getDataFileId() ) {
subset.remove(second.getDataFileId());
}
return !subset.isEmpty();
}
}
@Override
public void visit(List<Location> keys, List<Long> values) {
for (Location l : keys) {
int fileId = l.getDataFileId();
if( last != fileId ) {
gcCandidateSet.remove(fileId);
last = fileId;
}
}
}
});
// Durable Subscription
if (entry.getValue().subLocations != null) {
Iterator<Entry<String, Location>> iter = entry.getValue().subLocations.iterator(tx);
while (iter.hasNext()) {
Entry<String, Location> subscription = iter.next();
int dataFileId = subscription.getValue().getDataFileId();
// Move subscription along if it has no outstanding messages that need ack'd
// and its in the last log file in the journal.
if (!gcCandidateSet.isEmpty() && gcCandidateSet.first() == dataFileId) {
final StoredDestination destination = entry.getValue();
final String subscriptionKey = subscription.getKey();
SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
// When pending is size one that is the next message Id meaning there
// are no pending messages currently.
if (pendingAcks == null || pendingAcks.isEmpty() ||
(pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found candidate for rewrite: sub {} on {} from file {}", subscriptionKey, entry.getKey(), dataFileId);
}
final KahaSubscriptionCommand kahaSub =
destination.subscriptions.get(tx, subscriptionKey);
destination.subLocations.put(
tx, subscriptionKey, checkpointSubscriptionCommand(kahaSub));
// Skips the remove from candidates if we rewrote the subscription
// in order to prevent duplicate subscription commands on recover.
// If another subscription is on the same file and isn't rewritten
// than it will remove the file from the set.
continue;
}
}
if (LOG.isTraceEnabled()) {
final StoredDestination destination = entry.getValue();
final String subscriptionKey = subscription.getKey();
final SequenceSet pendingAcks = destination.ackPositions.get(tx, subscriptionKey);
LOG.trace("sub {} on {} in dataFile {} has pendingCount {}", subscriptionKey, entry.getKey(), dataFileId, pendingAcks.rangeSize()-1);
}
gcCandidateSet.remove(dataFileId);
}
}
if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates after dest:" + entry.getKey() + ", " + gcCandidateSet);
}
}
// check we are not deleting file with ack for in-use journal files
if (LOG.isTraceEnabled()) {
LOG.trace("gc candidates: " + gcCandidateSet);
LOG.trace("ackMessageFileMap: " + metadata.ackMessageFileMap);
}
boolean ackMessageFileMapMod = false;
Iterator<Integer> candidates = gcCandidateSet.iterator();
while (candidates.hasNext()) {
Integer candidate = candidates.next();
Set<Integer> referencedFileIds = metadata.ackMessageFileMap.get(candidate);
if (referencedFileIds != null) {
for (Integer referencedFileId : referencedFileIds) {
if (completeFileSet.contains(referencedFileId) && !gcCandidateSet.contains(referencedFileId)) {
// active file that is not targeted for deletion is referenced so don't delete
candidates.remove();
break;
}
}
if (gcCandidateSet.contains(candidate)) {
ackMessageFileMapMod |= (metadata.ackMessageFileMap.remove(candidate) != null);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("not removing data file: " + candidate
+ " as contained ack(s) refer to referenced file: " + referencedFileIds);
}
}
}
}
if (!gcCandidateSet.isEmpty()) {
LOG.debug("Cleanup removing the data files: {}", gcCandidateSet);
for (Integer candidate : gcCandidateSet) {
for (Set<Integer> ackFiles : metadata.ackMessageFileMap.values()) {
ackMessageFileMapMod |= ackFiles.remove(candidate);
metadata.ackMessageFileMapDirtyFlag.lazySet(true);
}
}
if (ackMessageFileMapMod) {
checkpointUpdate(tx, false);
}
} else if (isEnableAckCompaction()) {
if (++checkPointCyclesWithNoGC >= getCompactAcksAfterNoGC()) {
// First check length of journal to make sure it makes sense to even try.
//
// If there is only one journal file with Acks in it we don't need to move
// it since it won't be chained to any later logs.
//
// If the logs haven't grown since the last time then we need to compact
// otherwise there seems to still be room for growth and we don't need to incur
// the overhead. Depending on configuration this check can be avoided and
// Ack compaction will run any time the store has not GC'd a journal file in
// the configured amount of cycles.
if (metadata.ackMessageFileMap.size() > 1 &&
(journalLogOnLastCompactionCheck == journal.getCurrentDataFileId() || isCompactAcksIgnoresStoreGrowth())) {
LOG.trace("No files GC'd checking if threshold to ACK compaction has been met.");
try {
scheduler.execute(new AckCompactionRunner());
} catch (Exception ex) {
LOG.warn("Error on queueing the Ack Compactor", ex);
}
} else {
LOG.trace("Journal activity detected, no Ack compaction scheduled.");
}
checkPointCyclesWithNoGC = 0;
} else {
LOG.trace("Not yet time to check for compaction: {} of {} cycles",
checkPointCyclesWithNoGC, getCompactAcksAfterNoGC());
}
journalLogOnLastCompactionCheck = journal.getCurrentDataFileId();
}
}
MDC.remove("activemq.persistenceDir");
LOG.debug("Checkpoint done.");
return gcCandidateSet;
}