final CompactionResult compact()

in oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/AbstractCompactionStrategy.java [132:292]


    final CompactionResult compact(Context context, NodeState base) {
        context.getGCListener().info("running {} compaction", formatCompactionType(getCompactionType()));

        GCGeneration baseGeneration = getGcGeneration(context);
        GCGeneration partialGeneration = partialGeneration(baseGeneration);
        GCGeneration targetGeneration = targetGeneration(baseGeneration);
        GCIncrement gcIncrement = new GCIncrement(baseGeneration, partialGeneration, targetGeneration);

        try {
            PrintableStopwatch watch = PrintableStopwatch.createStarted();
            context.getGCListener().info(
                "compaction started, gc options={},\n{}",
                context.getGCOptions(),
                gcIncrement
            );
            context.getGCListener().updateStatus(COMPACTION.message());

            GCJournal.GCJournalEntry gcEntry = context.getGCJournal().read();
            long initialSize = size(context);

            CompactionWriter writer = new CompactionWriter(
                    context.getSegmentReader(),
                    context.getBlobStore(),
                    gcIncrement,
                    context.getSegmentWriterFactory());

            context.getCompactionMonitor().init(gcEntry.getRepoSize(), gcEntry.getNodes(), initialSize);

            Canceller hardCanceller = context.getHardCanceller().withShortCircuit();
            Canceller softCanceller = context.getSoftCanceller().withShortCircuit();

            Compactor compactor = newCompactor(context, writer);
            CompactedNodeState compacted = null;

            int cycles;
            boolean success;
            final int retryCount = Math.max(0, context.getGCOptions().getRetryCount());

            SegmentNodeState head;
            Flusher flusher = () -> {
                writer.flush();
                context.getFlusher().flush();
            };

            do {
                head = getHead(context);
                SegmentNodeState after = (compacted == null) ? head : compacted;
                Canceller stateSaveTrigger = context.getStateSaveTriggerSupplier().get().withShortCircuit();

                if (stateSaveTrigger.isCancelable()) {
                    context.getGCListener().info("intermediate state save enabled.");
                    Canceller saveStateCanceller = softCanceller.withCondition(
                            "save intermediate compaction state", () -> stateSaveTrigger.check().isCancelled());
                    compacted = compactor.compactDown(base, after, hardCanceller, saveStateCanceller);
                } else if (softCanceller.isCancelable()) {
                    context.getGCListener().info("soft cancellation enabled.");
                    compacted = compactor.compactDown(base, after, hardCanceller, softCanceller);
                } else {
                    compacted = compactor.compactUp(base, after, hardCanceller);
                }

                if (compacted == null) {
                    context.getGCListener().warn("compaction cancelled: {}.",
                            hardCanceller.check().getReason().orElse("unknown reason"));
                    return compactionAborted(context, targetGeneration);
                }

                context.getGCListener().info("compaction cycle 0 completed in {}. Compacted {} to {}",
                        watch, head.getRecordId(), compacted.getRecordId());

                cycles = 0;

                while (!(success = setHead(context, head, compacted)) && cycles < retryCount) {
                    // Some other concurrent changes have been made.
                    // Rebase (and compact) those changes on top of the
                    // compacted state before retrying to set the head.
                    cycles++;
                    context.getGCListener().info("compaction detected concurrent commits while compacting. " +
                                    "Compacting these commits. Cycle {} of {}", cycles, retryCount);
                    context.getGCListener().updateStatus(COMPACTION_RETRY.message() + cycles);
                    PrintableStopwatch cycleWatch = PrintableStopwatch.createStarted();

                    SegmentNodeState newHead = getHead(context);
                    compacted = compactor.compact(head, newHead,compacted, hardCanceller);
                    if (compacted == null) {
                        context.getGCListener().warn("compaction cancelled: {}.",
                                hardCanceller.check().getReason().orElse("unknown reason"));
                        return compactionAborted(context, targetGeneration);
                    }

                    context.getGCListener().info("compaction cycle {} completed in {}. Compacted {} against {} to {}",
                            cycles, cycleWatch, head.getRecordId(), newHead.getRecordId(), compacted.getRecordId());
                    head = newHead;
                }

                if (success) {
                    flusher.flush();
                }
            } while (success && !compacted.isComplete() && !softCanceller.check().isCancelled());

            if (!success) {
                context.getGCListener().info("compaction gave up compacting concurrent commits after {} cycles.", cycles);
                int forceTimeout = context.getGCOptions().getForceTimeout();
                if (forceTimeout > 0) {
                    context.getGCListener().info("trying to force compact remaining commits for {} seconds. " +
                            "Concurrent commits to the store will be blocked.",
                        forceTimeout);
                    context.getGCListener().updateStatus(COMPACTION_FORCE_COMPACT.message());
                    PrintableStopwatch forceWatch = PrintableStopwatch.createStarted();

                    cycles++;

                    Canceller forcedCompactionCanceller = hardCanceller
                        .withTimeout("forced compaction timeout exceeded", forceTimeout, SECONDS)
                        .withShortCircuit();
                    compacted = forceCompact(context, head, compacted, compactor, forcedCompactionCanceller);
                    if (compacted != null) {
                        success = true;
                        flusher.flush();
                        context.getGCListener().info("compaction succeeded to force compact remaining commits after {}.", forceWatch);
                    } else {
                        Cancellation cancellation = forcedCompactionCanceller.check();
                        if (cancellation.isCancelled()) {
                            context.getGCListener().warn("compaction failed to force compact remaining commits " +
                                    "after {}. Compaction was cancelled: {}.",
                                forceWatch, cancellation.getReason().orElse("unknown reason"));
                        } else {
                            context.getGCListener().warn("compaction failed to force compact remaining commits. " +
                                    "after {}. Could not acquire exclusive access to the node store.",
                                forceWatch);
                        }
                    }
                }
            }

            if (success) {
                // Update type of the last compaction before calling methods that could throw an exception.
                context.getSuccessfulCompactionListener().onSuccessfulCompaction(getCompactionType());
                context.getCompactionMonitor().finished();

                if (compacted.isComplete()) {
                    context.getGCListener().info("compaction succeeded in {}, after {} cycles", watch, cycles);
                    return compactionSucceeded(context, targetGeneration, compacted.getRecordId());
                } else {
                    context.getGCListener().info("compaction partially succeeded in {}: {}.",
                            watch, softCanceller.check().getReason().orElse("unknown reason"));
                    return compactionPartiallySucceeded(context, partialGeneration, compacted.getRecordId());
                }
            } else {
                context.getGCListener().info("compaction failed after {}, and {} cycles", watch, cycles);
                return compactionAborted(context, targetGeneration);
            }
        } catch (InterruptedException e) {
            context.getGCListener().error("compaction interrupted", e);
            currentThread().interrupt();
            return compactionAborted(context, targetGeneration);
        } catch (Throwable e) {
            context.getGCListener().error("compaction encountered an error", e instanceof Exception ? (Exception) e : new Exception(e));
            return compactionAborted(context, targetGeneration);
        }
    }