in cassandra-four-zero/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java [479:535]
private void readMutationInternal(byte[] inputBuffer,
int size,
int mutationPosition,
CommitLogDescriptor descriptor) throws IOException
{
// For now, we need to go through the motions of deserializing the mutation to determine its size and move
// the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment
Mutation mutation;
try (RebufferingInputStream bufferIn = new DataInputBuffer(inputBuffer, 0, size))
{
mutation = Mutation.serializer.deserialize(bufferIn,
descriptor.getMessagingVersion(),
DeserializationHelper.Flag.LOCAL);
// Double-check that what we read is still valid for the current schema
for (PartitionUpdate update : mutation.getPartitionUpdates())
{
update.validate();
}
}
catch (UnknownTableException exception)
{
if (exception.id == null)
{
return;
}
// We see many unknown table exception logs when we skip over mutations from other tables
logger.trace("Invalid mutation", exception);
return;
}
catch (Throwable throwable)
{
JVMStabilityInspector.inspectThrowable(throwable);
Path path = Files.createTempFile("mutation", "dat");
try (DataOutputStream out = new DataOutputStream(Files.newOutputStream(path)))
{
out.write(inputBuffer, 0, size);
}
// Checksum passed so this error can't be permissible
handleUnrecoverableError(new CommitLogReadException(
String.format("Unexpected error deserializing mutation; saved to %s. "
+ "This may be caused by replaying a mutation against a table with the same name but incompatible schema. "
+ "Exception follows: %s", path, throwable),
CommitLogReadErrorReason.MUTATION_ERROR,
false));
return;
}
logger.trace("Read mutation for", () -> "keyspace", mutation::getKeyspaceName,
() -> "key", mutation::key,
() -> "mutation", () -> mutation.getPartitionUpdates().stream()
.map(AbstractBTreePartition::toString)
.collect(Collectors.joining(", ", "{", "}")));
handleMutation(mutation, size, mutationPosition, descriptor);
}