protected void runMayThrow()

in src/java/org/apache/cassandra/db/compaction/CompactionTask.java [147:334]


    protected void runMayThrow() throws Exception
    {
        // The collection of sstables passed may be empty (but not null); even if
        // it is not empty, it may compact down to nothing if all rows are deleted.
        assert transaction != null;

        if (inputSSTables().isEmpty())
            return;

        // Note that the current compaction strategy, is not necessarily the one this task was created under.
        // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
        CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();

        if (DatabaseDescriptor.isSnapshotBeforeCompaction())
        {
            SnapshotOptions options = SnapshotOptions.systemSnapshot(cfs.name, SnapshotType.COMPACT, cfs.getKeyspaceTableName()).skipFlush().build();
            SnapshotManager.instance.takeSnapshot(options);
        }

        try (CompactionController controller = getCompactionController(inputSSTables()))
        {
            // Note: the controller set-up above relies on using the transaction-provided sstable list, from which
            // fully-expired sstables should not be removed (so that the overlap tracker does not include them), but
            // sstables excluded for scope reduction should be removed.
            Set<SSTableReader> actuallyCompact = new HashSet<>(inputSSTables());

            final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables();
            if (!fullyExpiredSSTables.isEmpty())
            {
                logger.debug("Compaction {} dropping expired sstables: {}", transaction.opIdString(), fullyExpiredSSTables);
                actuallyCompact.removeAll(fullyExpiredSSTables);
            }

            TimeUUID taskId = transaction.opId();
            // select SSTables to compact based on available disk space.
            final boolean hasExpirations = !fullyExpiredSSTables.isEmpty();
            if ((shouldReduceScopeForSpace() && !buildCompactionCandidatesForAvailableDiskSpace(actuallyCompact, hasExpirations, taskId))
                || hasExpirations)
            {
                // The set of sstables has changed (one or more were excluded due to limited available disk space).
                // We need to recompute the overlaps between sstables. The iterators used in the compaction controller
                // and tracker will reflect the changed set of sstables made by LifecycleTransaction.cancel(),
                // so refreshing the overlaps will be based on the updated set of sstables.
                controller.refreshOverlaps();
            }

            // sanity check: all sstables must belong to the same cfs
            assert !Iterables.any(actuallyCompact, new Predicate<SSTableReader>()
            {
                @Override
                public boolean apply(SSTableReader sstable)
                {
                    return !sstable.descriptor.cfname.equals(cfs.name);
                }
            });

            // new sstables from flush can be added during a compaction, but only the compaction can remove them,
            // so in our single-threaded compaction world this is a valid way of determining if we're compacting
            // all the sstables (that existed when we started)
            StringBuilder ssTableLoggerMsg = new StringBuilder("[");
            for (SSTableReader sstr : actuallyCompact)
            {
                ssTableLoggerMsg.append(sstr.getSSTableLevel() != 0 ? String.format("%s:level=%d", sstr.getFilename(), sstr.getSSTableLevel())
                                                                    : sstr.getFilename());
                ssTableLoggerMsg.append(", ");
            }
            ssTableLoggerMsg.append("]");

            logger.info("Compacting ({}) {}", transaction.opIdString(), ssTableLoggerMsg);

            RateLimiter limiter = CompactionManager.instance.getRateLimiter();
            long start = nanoTime();
            long startTime = currentTimeMillis();
            long totalKeysWritten = 0;
            long estimatedKeys = 0;
            long inputSizeBytes;
            long timeSpentWritingKeys;

            Collection<SSTableReader> newSStables;

            long[] mergedRowCounts;
            long totalSourceCQLRows;

            Range<Token> tokenRange = tokenRange();
            List<Range<Token>> rangeList = tokenRange != null ? ImmutableList.of(tokenRange) : null;

            long nowInSec = FBUtilities.nowInSeconds();
            try (Refs<SSTableReader> refs = Refs.ref(actuallyCompact);
                 AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact, rangeList);
                 CompactionIterator ci = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, taskId))
            {
                long lastCheckObsoletion = start;
                inputSizeBytes = scanners.getTotalCompressedSize();
                double compressionRatio = scanners.getCompressionRatio();
                if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
                    compressionRatio = 1.0;

                long lastBytesScanned = 0;

                activeCompactions.beginCompaction(ci);
                try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
                {
                    // Note that we need to re-check this flag after calling beginCompaction above to avoid a window
                    // where the compaction does not exist in activeCompactions but the CSM gets paused.
                    // We already have the sstables marked compacting here so CompactionManager#waitForCessation will
                    // block until the below exception is thrown and the transaction is cancelled.
                    if (!controller.cfs.getCompactionStrategyManager().isActive())
                        throw new CompactionInterruptedException(ci.getCompactionInfo());
                    estimatedKeys = writer.estimatedKeys();
                    while (ci.hasNext())
                    {
                        if (writer.append(ci.next()))
                            totalKeysWritten++;

                        ci.setTargetDirectory(writer.getSStableDirectory().path());
                        long bytesScanned = scanners.getTotalBytesScanned();

                        // Rate limit the scanners, and account for compression
                        CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);

                        lastBytesScanned = bytesScanned;

                        if (nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
                        {
                            controller.maybeRefreshOverlaps();
                            lastCheckObsoletion = nanoTime();
                        }
                    }
                    timeSpentWritingKeys = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start);

                    // point of no return
                    newSStables = writer.finish();
                }
                finally
                {
                    activeCompactions.finishCompaction(ci);
                    mergedRowCounts = ci.getMergedRowCounts();
                    totalSourceCQLRows = ci.getTotalSourceCQLRows();
                }
            }

            if (transaction.isOffline())
                return;

            // log a bunch of statistics about the result and save to system table compaction_history
            long durationInNano = nanoTime() - start;
            long dTime = TimeUnit.NANOSECONDS.toMillis(durationInNano);
            long startsize = inputSizeBytes;
            long endsize = SSTableReader.getTotalBytes(newSStables);
            double ratio = (double) endsize / (double) startsize;

            StringBuilder newSSTableNames = new StringBuilder();
            for (SSTableReader reader : newSStables)
                newSSTableNames.append(reader.descriptor.baseFile()).append(",");
            long totalSourceRows = 0;
            for (int i = 0; i < mergedRowCounts.length; i++)
                totalSourceRows += mergedRowCounts[i] * (i + 1);

            String mergeSummary = updateCompactionHistory(taskId, cfs.getKeyspaceName(), cfs.getTableName(), mergedRowCounts, startsize, endsize,
                                                          ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType.type));

            logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d.  %s to %s (~%d%% of original) in %,dms.  Read Throughput = %s, Write Throughput = %s, Row Throughput = ~%,d/s.  %,d total partitions merged to %,d.  Partition merge counts were {%s}. Time spent writing keys = %,dms",
                                      transaction.opIdString(),
                                      actuallyCompact.size(),
                                      newSSTableNames.toString(),
                                      getLevel(),
                                      FBUtilities.prettyPrintMemory(startsize),
                                      FBUtilities.prettyPrintMemory(endsize),
                                      (int) (ratio * 100),
                                      dTime,
                                      FBUtilities.prettyPrintMemoryPerSecond(startsize, durationInNano),
                                      FBUtilities.prettyPrintMemoryPerSecond(endsize, durationInNano),
                                      (int) totalSourceCQLRows / (TimeUnit.NANOSECONDS.toSeconds(durationInNano) + 1),
                                      totalSourceRows,
                                      totalKeysWritten,
                                      mergeSummary,
                                      timeSpentWritingKeys));
            if (logger.isTraceEnabled())
            {
                logger.trace("CF Total Bytes Compacted: {}", FBUtilities.prettyPrintMemory(CompactionTask.addToTotalBytesCompacted(endsize)));
                logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
            }
            cfs.getCompactionStrategyManager().compactionLogger.compaction(startTime, inputSSTables(), currentTimeMillis(), newSStables);

            // update the metrics
            cfs.metric.compactionBytesWritten.inc(endsize);
        }
    }