in src/java/org/apache/cassandra/db/ReadCommand.java [597:726]
private UnfilteredPartitionIterator withMetricsRecording(UnfilteredPartitionIterator iter, final TableMetrics metric, final long startTimeNanos)
{
class MetricRecording extends Transformation<UnfilteredRowIterator>
{
private final int failureThreshold = DatabaseDescriptor.getTombstoneFailureThreshold();
private final int warningThreshold = DatabaseDescriptor.getTombstoneWarnThreshold();
private final boolean respectTombstoneThresholds = !SchemaConstants.isLocalSystemKeyspace(ReadCommand.this.metadata().keyspace);
private final boolean enforceStrictLiveness = metadata().enforceStrictLiveness();
private int liveRows = 0;
private int lastReportedLiveRows = 0;
private int tombstones = 0;
private int lastReportedTombstones = 0;
private DecoratedKey currentKey;
@Override
public UnfilteredRowIterator applyToPartition(UnfilteredRowIterator iter)
{
currentKey = iter.partitionKey();
return Transformation.apply(iter, this);
}
@Override
public Row applyToStatic(Row row)
{
return applyToRow(row);
}
@Override
public Row applyToRow(Row row)
{
boolean hasTombstones = false;
for (Cell<?> cell : row.cells())
{
if (!cell.isLive(ReadCommand.this.nowInSec()))
{
countTombstone(row.clustering());
hasTombstones = true; // allows to avoid counting an extra tombstone if the whole row expired
}
}
if (row.hasLiveData(ReadCommand.this.nowInSec(), enforceStrictLiveness))
++liveRows;
else if (!row.primaryKeyLivenessInfo().isLive(ReadCommand.this.nowInSec())
&& row.hasDeletion(ReadCommand.this.nowInSec())
&& !hasTombstones)
{
// We're counting primary key deletions only here.
countTombstone(row.clustering());
}
return row;
}
@Override
public RangeTombstoneMarker applyToMarker(RangeTombstoneMarker marker)
{
countTombstone(marker.clustering());
return marker;
}
private void countTombstone(ClusteringPrefix<?> clustering)
{
++tombstones;
if (tombstones > failureThreshold && respectTombstoneThresholds)
{
String query = ReadCommand.this.toCQLString();
Tracing.trace("Scanned over {} tombstones for query {}; query aborted (see tombstone_failure_threshold)", failureThreshold, query);
metric.tombstoneFailures.inc();
if (trackWarnings)
{
MessageParams.remove(ParamType.TOMBSTONE_WARNING);
MessageParams.add(ParamType.TOMBSTONE_FAIL, tombstones);
}
throw new TombstoneOverwhelmingException(tombstones, query, ReadCommand.this.metadata(), currentKey, clustering);
}
}
@Override
protected void onPartitionClose()
{
int lr = liveRows - lastReportedLiveRows;
int ts = tombstones - lastReportedTombstones;
if (lr > 0)
metric.topReadPartitionRowCount.addSample(currentKey.getKey(), lr);
if (ts > 0)
metric.topReadPartitionTombstoneCount.addSample(currentKey.getKey(), ts);
lastReportedLiveRows = liveRows;
lastReportedTombstones = tombstones;
}
@Override
public void onClose()
{
recordLatency(metric, nanoTime() - startTimeNanos);
metric.tombstoneScannedHistogram.update(tombstones);
metric.liveScannedHistogram.update(liveRows);
boolean warnTombstones = tombstones > warningThreshold && respectTombstoneThresholds;
if (warnTombstones)
{
String msg = String.format(
"Read %d live rows and %d tombstone cells for query %1.512s; token %s (see tombstone_warn_threshold)",
liveRows, tombstones, ReadCommand.this.toCQLString(), currentKey.getToken());
if (trackWarnings)
MessageParams.add(ParamType.TOMBSTONE_WARNING, tombstones);
else
ClientWarn.instance.warn(msg);
if (tombstones < failureThreshold)
{
metric.tombstoneWarnings.inc();
}
logger.warn(msg);
}
Tracing.trace("Read {} live rows and {} tombstone cells{}",
liveRows, tombstones,
(warnTombstones ? " (see tombstone_warn_threshold)" : ""));
}
}
return Transformation.apply(iter, new MetricRecording());
}