in cassandra-four-zero-bridge/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java [398:514]
private void readSection(FileDataInput reader,
int end) throws IOException
{
long startTimeNanos = System.nanoTime();
logger.trace("Reading log section", "end", end);
while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
{
int mutationStart = (int) reader.getFilePointer();
logger.trace("Reading mutation at", "position", mutationStart);
long claimedCRC32;
int serializedSize;
try
{
// We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
// of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
// However, it's possible with 2.1 era commitlogs that the last mutation ended less than 4 bytes
// from the end of the file, which means that we'll be unable to read an a full int and instead
// read an EOF here
if (end - reader.getFilePointer() < 4)
{
logger.trace("Not enough bytes left for another mutation in this CommitLog section, continuing");
statusTracker.requestTermination();
return;
}
// any of the reads may hit EOF
serializedSize = reader.readInt();
if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
{
logger.trace("Encountered end of segment marker at", "position", reader.getFilePointer());
statusTracker.requestTermination();
return;
}
// Mutation must be at LEAST 10 bytes:
// 3 for a non-empty Keyspace
// 3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
// 4 bytes for column count.
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
if (serializedSize < 10)
{
if (this.shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Invalid mutation size %d at %d in %s", serializedSize, mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
stats.commitLogInvalidSizeMutationCount(1);
return;
}
long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader);
checksum.reset();
CommitLogFormat.updateChecksum(checksum, serializedSize);
if (checksum.getValue() != claimedSizeChecksum)
{
if (this.shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Mutation size checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
stats.mutationsChecksumMismatchCount(1);
return;
}
if (serializedSize > buffer.length)
{
buffer = new byte[(int) (1.2 * serializedSize)];
}
reader.readFully(buffer, 0, serializedSize);
claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader);
}
catch (EOFException eof)
{
if (this.shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Unexpected end of segment at %d in %s", mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.EOF,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
stats.commitLogSegmentUnexpectedEndErrorCount(1);
return;
}
checksum.update(buffer, 0, serializedSize);
if (claimedCRC32 != checksum.getValue())
{
if (this.shouldSkipSegmentOnError(new CommitLogReadException(
String.format("Mutation checksum failure at %d in %s", mutationStart, statusTracker.errorContext),
CommitLogReadErrorReason.MUTATION_ERROR,
statusTracker.tolerateErrorsInSection)))
{
statusTracker.requestTermination();
}
stats.mutationsChecksumMismatchCount(1);
continue;
}
int mutationPosition = (int) reader.getFilePointer();
readMutationInternal(buffer, serializedSize, mutationPosition);
statusTracker.addProcessedMutation();
}
stats.commitLogSegmentReadTime(System.nanoTime() - startTimeNanos);
}