private void readSection()

in cassandra-four-zero/src/main/java/org/apache/cassandra/db/commitlog/BufferingCommitLogReader.java [362:468]


    private void readSection(FileDataInput reader, int end, CommitLogDescriptor desc) throws IOException
    {
        while (statusTracker.shouldContinue() && reader.getFilePointer() < end && !reader.isEOF())
        {
            int mutationStart = (int) reader.getFilePointer();
            logger.trace("Reading mutation at", "position", mutationStart);

            long claimedCRC32;
            int serializedSize;
            try
            {
                // We rely on reading serialized size == 0 (LEGACY_END_OF_SEGMENT_MARKER) to identify the end
                // of a segment, which happens naturally due to the 0 padding of the empty segment on creation.
                // However, it's possible with 2.1 era CommitLogs that the last mutation ended less than 4 bytes
                // from the end of the file, which means that we'll be unable to read a full int and instead
                // read an EOF here.
                if (end - reader.getFilePointer() < 4)
                {
                    logger.trace("Not enough bytes left for another mutation in this CommitLog section, continuing");
                    statusTracker.requestTermination();
                    return;
                }

                // Any of the reads may hit EOF
                serializedSize = reader.readInt();
                if (serializedSize == LEGACY_END_OF_SEGMENT_MARKER)
                {
                    logger.trace("Encountered end of segment marker at", "position", reader.getFilePointer());
                    statusTracker.requestTermination();
                    return;
                }

                // Mutation must be at LEAST 10 bytes:
                //   3 for a non-empty Keyspace
                //   3 for a Key (including the 2-byte length from writeUTF/writeWithShortLength)
                //   4 bytes for column count
                // This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
                if (serializedSize < 10)
                {
                    if (shouldSkipSegmentOnError(new CommitLogReadException(
                            String.format("Invalid mutation size %d at %d in %s",
                                          serializedSize, mutationStart, statusTracker.errorContext),
                            CommitLogReadErrorReason.MUTATION_ERROR,
                            statusTracker.tolerateErrorsInSection)))
                    {
                        statusTracker.requestTermination();
                    }
                    return;
                }

                long claimedSizeChecksum = CommitLogFormat.calculateClaimedChecksum(reader);
                checksum.reset();
                CommitLogFormat.updateChecksum(checksum, serializedSize);

                if (checksum.getValue() != claimedSizeChecksum)
                {
                    if (shouldSkipSegmentOnError(new CommitLogReadException(
                            String.format("Mutation size checksum failure at %d in %s",
                                          mutationStart, statusTracker.errorContext),
                            CommitLogReadErrorReason.MUTATION_ERROR,
                            statusTracker.tolerateErrorsInSection)))
                    {
                        statusTracker.requestTermination();
                    }
                    return;
                }

                if (serializedSize > buffer.length)
                {
                    buffer = new byte[(int) (1.2 * serializedSize)];
                }
                reader.readFully(buffer, 0, serializedSize);

                claimedCRC32 = CommitLogFormat.calculateClaimedCRC32(reader);
            }
            catch (EOFException exception)
            {
                if (shouldSkipSegmentOnError(new CommitLogReadException(
                        String.format("Unexpected end of segment at %d in %s",
                                      mutationStart, statusTracker.errorContext),
                        CommitLogReadErrorReason.EOF,
                        statusTracker.tolerateErrorsInSection)))
                {
                    statusTracker.requestTermination();
                }
                return;
            }

            checksum.update(buffer, 0, serializedSize);
            if (claimedCRC32 != checksum.getValue())
            {
                if (shouldSkipSegmentOnError(new CommitLogReadException(
                        String.format("Mutation checksum failure at %d in %s",
                                      mutationStart, statusTracker.errorContext),
                        CommitLogReadErrorReason.MUTATION_ERROR,
                        statusTracker.tolerateErrorsInSection)))
                {
                    statusTracker.requestTermination();
                }
                continue;
            }

            int mutationPosition = (int) reader.getFilePointer();
            readMutationInternal(buffer, serializedSize, mutationPosition, desc);
            statusTracker.addProcessedMutation();
        }
    }