private void readMutationInternal()

in cassandra-four-zero/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java [479:535]


    private void readMutationInternal(byte[] inputBuffer,
                                      int size,
                                      int mutationPosition,
                                      CommitLogDescriptor descriptor) throws IOException
    {
        // For now, we need to go through the motions of deserializing the mutation to determine its size and move
        // the file pointer forward accordingly, even if we're behind the requested minPosition within this SyncSegment

        Mutation mutation;
        try (RebufferingInputStream bufferIn = new DataInputBuffer(inputBuffer, 0, size))
        {
            mutation = Mutation.serializer.deserialize(bufferIn,
                                                       descriptor.getMessagingVersion(),
                                                       DeserializationHelper.Flag.LOCAL);
            // Double-check that what we read is still valid for the current schema
            for (PartitionUpdate update : mutation.getPartitionUpdates())
            {
                update.validate();
            }
        }
        catch (UnknownTableException exception)
        {
            if (exception.id == null)
            {
                return;
            }
            // We see many unknown table exception logs when we skip over mutations from other tables
            logger.trace("Invalid mutation", exception);
            return;
        }
        catch (Throwable throwable)
        {
            JVMStabilityInspector.inspectThrowable(throwable);
            Path path = Files.createTempFile("mutation", "dat");

            try (DataOutputStream out = new DataOutputStream(Files.newOutputStream(path)))
            {
                out.write(inputBuffer, 0, size);
            }

            // Checksum passed so this error can't be permissible
            handleUnrecoverableError(new CommitLogReadException(
                    String.format("Unexpected error deserializing mutation; saved to %s. "
                                + "This may be caused by replaying a mutation against a table with the same name but incompatible schema. "
                                + "Exception follows: %s", path, throwable),
                    CommitLogReadErrorReason.MUTATION_ERROR,
                    false));
            return;
        }

        logger.trace("Read mutation for", () -> "keyspace", mutation::getKeyspaceName,
                                               () -> "key", mutation::key,
                                          () -> "mutation", () -> mutation.getPartitionUpdates().stream()
                                                                                                .map(AbstractBTreePartition::toString)
                                                                                                .collect(Collectors.joining(", ", "{", "}")));
        handleMutation(mutation, size, mutationPosition, descriptor);
    }