int antiCompactGroup()

in src/java/org/apache/cassandra/db/compaction/CompactionManager.java [1883:2044]


    int antiCompactGroup(ColumnFamilyStore cfs,
                         RangesAtEndpoint ranges,
                         LifecycleTransaction txn,
                         TimeUUID pendingRepair,
                         BooleanSupplier isCancelled)
    {
        Preconditions.checkArgument(!ranges.isEmpty(), "need at least one full or transient range");
        long groupMaxDataAge = -1;

        for (Iterator<SSTableReader> i = txn.originals().iterator(); i.hasNext();)
        {
            SSTableReader sstable = i.next();
            if (groupMaxDataAge < sstable.maxDataAge)
                groupMaxDataAge = sstable.maxDataAge;
        }

        if (txn.originals().size() == 0)
        {
            logger.info("No valid anticompactions for this group, All sstables were compacted and are no longer available");
            return 0;
        }

        logger.info("Anticompacting {} in {}.{} for {}", txn.originals(), cfs.getKeyspaceName(), cfs.getTableName(), pendingRepair);
        Set<SSTableReader> sstableAsSet = txn.originals();

        File destination = cfs.getDirectories().getWriteableLocationAsFile(cfs.getExpectedCompactedFileSize(sstableAsSet, OperationType.ANTICOMPACTION));
        long nowInSec = FBUtilities.nowInSeconds();
        RateLimiter limiter = getRateLimiter();

        /**
         * HACK WARNING
         *
         * We have multiple writers operating over the same Transaction, producing different sets of sstables that all
         * logically replace the transaction's originals.  The SSTableRewriter assumes it has exclusive control over
         * the transaction state, and this will lead to temporarily inconsistent sstable/tracker state if we do not
         * take special measures to avoid it.
         *
         * Specifically, if a number of rewriter have prepareToCommit() invoked in sequence, then two problematic things happen:
         *   1. The obsoleteOriginals() call of the first rewriter immediately remove the originals from the tracker, despite
         *      their having been only partially replaced.  To avoid this, we must either avoid obsoleteOriginals() or checkpoint()
         *   2. The LifecycleTransaction may only have prepareToCommit() invoked once, and this will checkpoint() also.
         *
         * Similarly commit() would finalise partially complete on-disk state.
         *
         * To avoid these problems, we introduce a SharedTxn that proxies all calls onto the underlying transaction
         * except prepareToCommit(), checkpoint(), obsoleteOriginals(), and commit().
         * We then invoke these methods directly once each of the rewriter has updated the transaction
         * with their share of replacements.
         *
         * Note that for the same essential reason we also explicitly disable early open.
         * By noop-ing checkpoint we avoid any of the problems with early open, but by continuing to explicitly
         * disable it we also prevent any of the extra associated work from being performed.
         */
        class SharedTxn extends WrappedLifecycleTransaction
        {
            public SharedTxn(ILifecycleTransaction delegate) { super(delegate); }
            public Throwable commit(Throwable accumulate) { return accumulate; }
            public void prepareToCommit() {}
            public void checkpoint() {}
            public void obsoleteOriginals() {}
            public void close() {}
        }

        CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
        try (SharedTxn sharedTxn = new SharedTxn(txn);
             SSTableRewriter fullWriter = SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
             SSTableRewriter transWriter = SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);
             SSTableRewriter unrepairedWriter = SSTableRewriter.constructWithoutEarlyOpening(sharedTxn, false, groupMaxDataAge);

             AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(txn.originals());
             CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec));
             CompactionIterator ci = getAntiCompactionIterator(scanners.scanners, controller, nowInSec, nextTimeUUID(), active, isCancelled))
        {
            int expectedBloomFilterSize = Math.max(cfs.metadata().params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet)));

            fullWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, false, sstableAsSet, txn));
            transWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, pendingRepair, true, sstableAsSet, txn));
            unrepairedWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false, sstableAsSet, txn));

            Predicate<Token> fullChecker = !ranges.onlyFull().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.onlyFull().ranges()) : t -> false;
            Predicate<Token> transChecker = !ranges.onlyTransient().isEmpty() ? new Range.OrderedRangeContainmentChecker(ranges.onlyTransient().ranges()) : t -> false;
            double compressionRatio = scanners.getCompressionRatio();
            if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
                compressionRatio = 1.0;

            long lastBytesScanned = 0;

            while (ci.hasNext())
            {
                try (UnfilteredRowIterator partition = ci.next())
                {
                    Token token = partition.partitionKey().getToken();
                    // if this row is contained in the full or transient ranges, append it to the appropriate sstable
                    if (fullChecker.test(token))
                    {
                        fullWriter.append(partition);
                        ci.setTargetDirectory(fullWriter.currentWriter().getFilename());
                    }
                    else if (transChecker.test(token))
                    {
                        transWriter.append(partition);
                        ci.setTargetDirectory(transWriter.currentWriter().getFilename());
                    }
                    else
                    {
                        // otherwise, append it to the unrepaired sstable
                        unrepairedWriter.append(partition);
                        ci.setTargetDirectory(unrepairedWriter.currentWriter().getFilename());
                    }
                    long bytesScanned = scanners.getTotalBytesScanned();
                    compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
                    lastBytesScanned = bytesScanned;
                }
            }

            fullWriter.prepareToCommit();
            transWriter.prepareToCommit();
            unrepairedWriter.prepareToCommit();
            txn.checkpoint();
            txn.obsoleteOriginals();
            txn.prepareToCommit();

            List<SSTableReader> fullSSTables = new ArrayList<>(fullWriter.finished());
            List<SSTableReader> transSSTables = new ArrayList<>(transWriter.finished());
            List<SSTableReader> unrepairedSSTables = new ArrayList<>(unrepairedWriter.finished());

            fullWriter.commit();
            transWriter.commit();
            unrepairedWriter.commit();
            txn.commit();
            logger.info("Anticompacted {} in {}.{} to full = {}, transient = {}, unrepaired = {} for {}",
                        sstableAsSet,
                        cfs.getKeyspaceName(),
                        cfs.getTableName(),
                        fullSSTables,
                        transSSTables,
                        unrepairedSSTables,
                        pendingRepair);
            return fullSSTables.size() + transSSTables.size() + unrepairedSSTables.size();
        }
        catch (Throwable e)
        {
            if (e instanceof CompactionInterruptedException)
            {
                if (isCancelled.getAsBoolean())
                {
                    logger.info("Anticompaction has been canceled for session {}", pendingRepair);
                    logger.trace(e.getMessage(), e);
                }
                else
                {
                    logger.info("Anticompaction for session {} has been stopped by request.", pendingRepair);
                }
            }
            else
            {
                JVMStabilityInspector.inspectThrowable(e);
                logger.error("Error anticompacting " + txn + " for " + pendingRepair, e);
            }
            throw e;
        }
    }