protected void scrubInternal()

in src/java/org/apache/cassandra/io/sstable/format/big/BigTableScrubber.java [85:214]


    protected void scrubInternal(SSTableRewriter writer) throws IOException
    {
        try
        {
            nextIndexKey = indexAvailable() ? ByteBufferUtil.readWithShortLength(indexFile) : null;
            if (indexAvailable())
            {
                // throw away variable, so we don't have a side effect in the assertion
                long firstRowPositionFromIndex = rowIndexEntrySerializer.deserializePositionAndSkip(indexFile);
                assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex;
            }
        }
        catch (Throwable ex)
        {
            throwIfFatal(ex);
            nextIndexKey = null;
            nextPartitionPositionFromIndex = dataFile.length();
            if (indexFile != null)
                indexFile.seek(indexFile.length());
        }

        DecoratedKey prevKey = null;

        while (!dataFile.isEOF())
        {
            if (scrubInfo.isStopRequested())
                throw new CompactionInterruptedException(scrubInfo.getCompactionInfo());

            long partitionStart = dataFile.getFilePointer();
            outputHandler.debug("Reading row at %d", partitionStart);

            DecoratedKey key = null;
            try
            {
                ByteBuffer raw = ByteBufferUtil.readWithShortLength(dataFile);
                if (!cfs.metadata.getLocal().isIndex())
                    cfs.metadata.getLocal().partitionKeyType.validate(raw);
                key = sstable.decorateKey(raw);
            }
            catch (Throwable th)
            {
                throwIfFatal(th);
                // check for null key below
            }

            long dataStartFromIndex = -1;
            long dataSizeFromIndex = -1;
            updateIndexKey();

            if (indexAvailable())
            {
                if (currentIndexKey != null)
                {
                    dataStartFromIndex = currentPartitionPositionFromIndex + 2 + currentIndexKey.remaining();
                    dataSizeFromIndex = nextPartitionPositionFromIndex - dataStartFromIndex;
                }
            }

            long dataStart = dataFile.getFilePointer();

            String keyName = key == null ? "(unreadable key)" : keyString(key);
            outputHandler.debug("partition %s is %s", keyName, FBUtilities.prettyPrintMemory(dataSizeFromIndex));
            assert currentIndexKey != null || !indexAvailable();

            try
            {
                if (key == null)
                    throw new IOError(new IOException("Unable to read partition key from data file"));

                if (currentIndexKey != null && !key.getKey().equals(currentIndexKey))
                {
                    throw new IOError(new IOException(String.format("Key from data file (%s) does not match key from index file (%s)",
                                                                    //ByteBufferUtil.bytesToHex(key.getKey()), ByteBufferUtil.bytesToHex(currentIndexKey))));
                                                                    "_too big_", ByteBufferUtil.bytesToHex(currentIndexKey))));
                }

                if (indexFile != null && dataSizeFromIndex > dataFile.length())
                    throw new IOError(new IOException("Impossible partition size (greater than file length): " + dataSizeFromIndex));

                if (indexFile != null && dataStart != dataStartFromIndex)
                    outputHandler.warn("Data file partition position %d differs from index file row position %d", dataStart, dataStartFromIndex);

                if (tryAppend(prevKey, key, writer))
                    prevKey = key;
            }
            catch (Throwable th)
            {
                throwIfFatal(th);
                outputHandler.warn(th, "Error reading partition %s (stacktrace follows):", keyName);

                if (currentIndexKey != null
                    && (key == null || !key.getKey().equals(currentIndexKey) || dataStart != dataStartFromIndex))
                {

                    outputHandler.output("Retrying from partition index; data is %s bytes starting at %s",
                                         dataSizeFromIndex, dataStartFromIndex);
                    key = sstable.decorateKey(currentIndexKey);
                    try
                    {
                        if (!cfs.metadata.getLocal().isIndex())
                            cfs.metadata.getLocal().partitionKeyType.validate(key.getKey());
                        dataFile.seek(dataStartFromIndex);

                        if (tryAppend(prevKey, key, writer))
                            prevKey = key;
                    }
                    catch (Throwable th2)
                    {
                        throwIfFatal(th2);
                        throwIfCannotContinue(key, th2);

                        outputHandler.warn(th2, "Retry failed too. Skipping to next partition (retry's stacktrace follows)");
                        badPartitions++;
                        if (!seekToNextPartition())
                            break;
                    }
                }
                else
                {
                    throwIfCannotContinue(key, th);

                    outputHandler.warn("Partition starting at position %d is unreadable; skipping to next", dataStart);
                    badPartitions++;
                    if (currentIndexKey != null)
                        if (!seekToNextPartition())
                            break;
                }
            }
        }
    }