in src/java/org/apache/cassandra/db/compaction/CompactionTask.java [147:334]
protected void runMayThrow() throws Exception
{
// The collection of sstables passed may be empty (but not null); even if
// it is not empty, it may compact down to nothing if all rows are deleted.
assert transaction != null;
if (inputSSTables().isEmpty())
return;
// Note that the current compaction strategy, is not necessarily the one this task was created under.
// This should be harmless; see comments to CFS.maybeReloadCompactionStrategy.
CompactionStrategyManager strategy = cfs.getCompactionStrategyManager();
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
{
SnapshotOptions options = SnapshotOptions.systemSnapshot(cfs.name, SnapshotType.COMPACT, cfs.getKeyspaceTableName()).skipFlush().build();
SnapshotManager.instance.takeSnapshot(options);
}
try (CompactionController controller = getCompactionController(inputSSTables()))
{
// Note: the controller set-up above relies on using the transaction-provided sstable list, from which
// fully-expired sstables should not be removed (so that the overlap tracker does not include them), but
// sstables excluded for scope reduction should be removed.
Set<SSTableReader> actuallyCompact = new HashSet<>(inputSSTables());
final Set<SSTableReader> fullyExpiredSSTables = controller.getFullyExpiredSSTables();
if (!fullyExpiredSSTables.isEmpty())
{
logger.debug("Compaction {} dropping expired sstables: {}", transaction.opIdString(), fullyExpiredSSTables);
actuallyCompact.removeAll(fullyExpiredSSTables);
}
TimeUUID taskId = transaction.opId();
// select SSTables to compact based on available disk space.
final boolean hasExpirations = !fullyExpiredSSTables.isEmpty();
if ((shouldReduceScopeForSpace() && !buildCompactionCandidatesForAvailableDiskSpace(actuallyCompact, hasExpirations, taskId))
|| hasExpirations)
{
// The set of sstables has changed (one or more were excluded due to limited available disk space).
// We need to recompute the overlaps between sstables. The iterators used in the compaction controller
// and tracker will reflect the changed set of sstables made by LifecycleTransaction.cancel(),
// so refreshing the overlaps will be based on the updated set of sstables.
controller.refreshOverlaps();
}
// sanity check: all sstables must belong to the same cfs
assert !Iterables.any(actuallyCompact, new Predicate<SSTableReader>()
{
@Override
public boolean apply(SSTableReader sstable)
{
return !sstable.descriptor.cfname.equals(cfs.name);
}
});
// new sstables from flush can be added during a compaction, but only the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of determining if we're compacting
// all the sstables (that existed when we started)
StringBuilder ssTableLoggerMsg = new StringBuilder("[");
for (SSTableReader sstr : actuallyCompact)
{
ssTableLoggerMsg.append(sstr.getSSTableLevel() != 0 ? String.format("%s:level=%d", sstr.getFilename(), sstr.getSSTableLevel())
: sstr.getFilename());
ssTableLoggerMsg.append(", ");
}
ssTableLoggerMsg.append("]");
logger.info("Compacting ({}) {}", transaction.opIdString(), ssTableLoggerMsg);
RateLimiter limiter = CompactionManager.instance.getRateLimiter();
long start = nanoTime();
long startTime = currentTimeMillis();
long totalKeysWritten = 0;
long estimatedKeys = 0;
long inputSizeBytes;
long timeSpentWritingKeys;
Collection<SSTableReader> newSStables;
long[] mergedRowCounts;
long totalSourceCQLRows;
Range<Token> tokenRange = tokenRange();
List<Range<Token>> rangeList = tokenRange != null ? ImmutableList.of(tokenRange) : null;
long nowInSec = FBUtilities.nowInSeconds();
try (Refs<SSTableReader> refs = Refs.ref(actuallyCompact);
AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact, rangeList);
CompactionIterator ci = new CompactionIterator(compactionType, scanners.scanners, controller, nowInSec, taskId))
{
long lastCheckObsoletion = start;
inputSizeBytes = scanners.getTotalCompressedSize();
double compressionRatio = scanners.getCompressionRatio();
if (compressionRatio == MetadataCollector.NO_COMPRESSION_RATIO)
compressionRatio = 1.0;
long lastBytesScanned = 0;
activeCompactions.beginCompaction(ci);
try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, getDirectories(), transaction, actuallyCompact))
{
// Note that we need to re-check this flag after calling beginCompaction above to avoid a window
// where the compaction does not exist in activeCompactions but the CSM gets paused.
// We already have the sstables marked compacting here so CompactionManager#waitForCessation will
// block until the below exception is thrown and the transaction is cancelled.
if (!controller.cfs.getCompactionStrategyManager().isActive())
throw new CompactionInterruptedException(ci.getCompactionInfo());
estimatedKeys = writer.estimatedKeys();
while (ci.hasNext())
{
if (writer.append(ci.next()))
totalKeysWritten++;
ci.setTargetDirectory(writer.getSStableDirectory().path());
long bytesScanned = scanners.getTotalBytesScanned();
// Rate limit the scanners, and account for compression
CompactionManager.instance.compactionRateLimiterAcquire(limiter, bytesScanned, lastBytesScanned, compressionRatio);
lastBytesScanned = bytesScanned;
if (nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L))
{
controller.maybeRefreshOverlaps();
lastCheckObsoletion = nanoTime();
}
}
timeSpentWritingKeys = TimeUnit.NANOSECONDS.toMillis(nanoTime() - start);
// point of no return
newSStables = writer.finish();
}
finally
{
activeCompactions.finishCompaction(ci);
mergedRowCounts = ci.getMergedRowCounts();
totalSourceCQLRows = ci.getTotalSourceCQLRows();
}
}
if (transaction.isOffline())
return;
// log a bunch of statistics about the result and save to system table compaction_history
long durationInNano = nanoTime() - start;
long dTime = TimeUnit.NANOSECONDS.toMillis(durationInNano);
long startsize = inputSizeBytes;
long endsize = SSTableReader.getTotalBytes(newSStables);
double ratio = (double) endsize / (double) startsize;
StringBuilder newSSTableNames = new StringBuilder();
for (SSTableReader reader : newSStables)
newSSTableNames.append(reader.descriptor.baseFile()).append(",");
long totalSourceRows = 0;
for (int i = 0; i < mergedRowCounts.length; i++)
totalSourceRows += mergedRowCounts[i] * (i + 1);
String mergeSummary = updateCompactionHistory(taskId, cfs.getKeyspaceName(), cfs.getTableName(), mergedRowCounts, startsize, endsize,
ImmutableMap.of(COMPACTION_TYPE_PROPERTY, compactionType.type));
logger.info(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %s to %s (~%d%% of original) in %,dms. Read Throughput = %s, Write Throughput = %s, Row Throughput = ~%,d/s. %,d total partitions merged to %,d. Partition merge counts were {%s}. Time spent writing keys = %,dms",
transaction.opIdString(),
actuallyCompact.size(),
newSSTableNames.toString(),
getLevel(),
FBUtilities.prettyPrintMemory(startsize),
FBUtilities.prettyPrintMemory(endsize),
(int) (ratio * 100),
dTime,
FBUtilities.prettyPrintMemoryPerSecond(startsize, durationInNano),
FBUtilities.prettyPrintMemoryPerSecond(endsize, durationInNano),
(int) totalSourceCQLRows / (TimeUnit.NANOSECONDS.toSeconds(durationInNano) + 1),
totalSourceRows,
totalKeysWritten,
mergeSummary,
timeSpentWritingKeys));
if (logger.isTraceEnabled())
{
logger.trace("CF Total Bytes Compacted: {}", FBUtilities.prettyPrintMemory(CompactionTask.addToTotalBytesCompacted(endsize)));
logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - estimatedKeys)/totalKeysWritten));
}
cfs.getCompactionStrategyManager().compactionLogger.compaction(startTime, inputSSTables(), currentTimeMillis(), newSStables);
// update the metrics
cfs.metric.compactionBytesWritten.inc(endsize);
}
}