in oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java [132:292]
final CompactionResult compact(Context context, NodeState base) {
context.getGCListener().info("running {} compaction", formatCompactionType(getCompactionType()));
GCGeneration baseGeneration = getGcGeneration(context);
GCGeneration partialGeneration = partialGeneration(baseGeneration);
GCGeneration targetGeneration = targetGeneration(baseGeneration);
GCIncrement gcIncrement = new GCIncrement(baseGeneration, partialGeneration, targetGeneration);
try {
PrintableStopwatch watch = PrintableStopwatch.createStarted();
context.getGCListener().info(
"compaction started, gc options={},\n{}",
context.getGCOptions(),
gcIncrement
);
context.getGCListener().updateStatus(COMPACTION.message());
GCJournal.GCJournalEntry gcEntry = context.getGCJournal().read();
long initialSize = size(context);
CompactionWriter writer = new CompactionWriter(
context.getSegmentReader(),
context.getBlobStore(),
gcIncrement,
context.getSegmentWriterFactory());
context.getCompactionMonitor().init(gcEntry.getRepoSize(), gcEntry.getNodes(), initialSize);
Canceller hardCanceller = context.getHardCanceller().withShortCircuit();
Canceller softCanceller = context.getSoftCanceller().withShortCircuit();
Compactor compactor = newCompactor(context, writer);
CompactedNodeState compacted = null;
int cycles;
boolean success;
final int retryCount = Math.max(0, context.getGCOptions().getRetryCount());
SegmentNodeState head;
Flusher flusher = () -> {
writer.flush();
context.getFlusher().flush();
};
do {
head = getHead(context);
SegmentNodeState after = (compacted == null) ? head : compacted;
Canceller stateSaveTrigger = context.getStateSaveTriggerSupplier().get().withShortCircuit();
if (stateSaveTrigger.isCancelable()) {
context.getGCListener().info("intermediate state save enabled.");
Canceller saveStateCanceller = softCanceller.withCondition(
"save intermediate compaction state", () -> stateSaveTrigger.check().isCancelled());
compacted = compactor.compactDown(base, after, hardCanceller, saveStateCanceller);
} else if (softCanceller.isCancelable()) {
context.getGCListener().info("soft cancellation enabled.");
compacted = compactor.compactDown(base, after, hardCanceller, softCanceller);
} else {
compacted = compactor.compactUp(base, after, hardCanceller);
}
if (compacted == null) {
context.getGCListener().warn("compaction cancelled: {}.",
hardCanceller.check().getReason().orElse("unknown reason"));
return compactionAborted(context, targetGeneration);
}
context.getGCListener().info("compaction cycle 0 completed in {}. Compacted {} to {}",
watch, head.getRecordId(), compacted.getRecordId());
cycles = 0;
while (!(success = setHead(context, head, compacted)) && cycles < retryCount) {
// Some other concurrent changes have been made.
// Rebase (and compact) those changes on top of the
// compacted state before retrying to set the head.
cycles++;
context.getGCListener().info("compaction detected concurrent commits while compacting. " +
"Compacting these commits. Cycle {} of {}", cycles, retryCount);
context.getGCListener().updateStatus(COMPACTION_RETRY.message() + cycles);
PrintableStopwatch cycleWatch = PrintableStopwatch.createStarted();
SegmentNodeState newHead = getHead(context);
compacted = compactor.compact(head, newHead,compacted, hardCanceller);
if (compacted == null) {
context.getGCListener().warn("compaction cancelled: {}.",
hardCanceller.check().getReason().orElse("unknown reason"));
return compactionAborted(context, targetGeneration);
}
context.getGCListener().info("compaction cycle {} completed in {}. Compacted {} against {} to {}",
cycles, cycleWatch, head.getRecordId(), newHead.getRecordId(), compacted.getRecordId());
head = newHead;
}
if (success) {
flusher.flush();
}
} while (success && !compacted.isComplete() && !softCanceller.check().isCancelled());
if (!success) {
context.getGCListener().info("compaction gave up compacting concurrent commits after {} cycles.", cycles);
int forceTimeout = context.getGCOptions().getForceTimeout();
if (forceTimeout > 0) {
context.getGCListener().info("trying to force compact remaining commits for {} seconds. " +
"Concurrent commits to the store will be blocked.",
forceTimeout);
context.getGCListener().updateStatus(COMPACTION_FORCE_COMPACT.message());
PrintableStopwatch forceWatch = PrintableStopwatch.createStarted();
cycles++;
Canceller forcedCompactionCanceller = hardCanceller
.withTimeout("forced compaction timeout exceeded", forceTimeout, SECONDS)
.withShortCircuit();
compacted = forceCompact(context, head, compacted, compactor, forcedCompactionCanceller);
if (compacted != null) {
success = true;
flusher.flush();
context.getGCListener().info("compaction succeeded to force compact remaining commits after {}.", forceWatch);
} else {
Cancellation cancellation = forcedCompactionCanceller.check();
if (cancellation.isCancelled()) {
context.getGCListener().warn("compaction failed to force compact remaining commits " +
"after {}. Compaction was cancelled: {}.",
forceWatch, cancellation.getReason().orElse("unknown reason"));
} else {
context.getGCListener().warn("compaction failed to force compact remaining commits. " +
"after {}. Could not acquire exclusive access to the node store.",
forceWatch);
}
}
}
}
if (success) {
// Update type of the last compaction before calling methods that could throw an exception.
context.getSuccessfulCompactionListener().onSuccessfulCompaction(getCompactionType());
context.getCompactionMonitor().finished();
if (compacted.isComplete()) {
context.getGCListener().info("compaction succeeded in {}, after {} cycles", watch, cycles);
return compactionSucceeded(context, targetGeneration, compacted.getRecordId());
} else {
context.getGCListener().info("compaction partially succeeded in {}: {}.",
watch, softCanceller.check().getReason().orElse("unknown reason"));
return compactionPartiallySucceeded(context, partialGeneration, compacted.getRecordId());
}
} else {
context.getGCListener().info("compaction failed after {}, and {} cycles", watch, cycles);
return compactionAborted(context, targetGeneration);
}
} catch (InterruptedException e) {
context.getGCListener().error("compaction interrupted", e);
currentThread().interrupt();
return compactionAborted(context, targetGeneration);
} catch (Throwable e) {
context.getGCListener().error("compaction encountered an error", e instanceof Exception ? (Exception) e : new Exception(e));
return compactionAborted(context, targetGeneration);
}
}