in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/CompressedRawInputStream.java [112:189]
private void decompressChunk(CompressionMetadata.Chunk chunk, double crcChance) throws IOException
{
int checkSumFromChunk;
assertChunkPos(chunk);
source.skipBytes((int) (chunk.offset - currentCompressed));
currentCompressed = chunk.offset;
if (compressed.length < chunk.length)
{
compressed = new byte[chunk.length];
}
if (chunk.length > 0)
{
try
{
source.readFully(compressed, 0, chunk.length);
}
catch (EOFException exception)
{
throw new IOException(String.format("Failed to read %d bytes from offset %d.",
chunk.length, chunk.offset), exception);
}
checkSumFromChunk = source.readInt();
stats.readBytes(chunk.length + checksumBytes.length); // 4 bytes for CRC
}
else
{
// Last block; we don't have the length of the last chunk; try to read full buffer length; this
// will almost certainly end up reading all of the compressed data; update current chunk length
// to the number of the bytes read minus 4 to accommodate for the chunk length field
int lastBytesLength = 0;
while (true)
{
if (lastBytesLength >= compressed.length)
{
byte[] buffer = new byte[lastBytesLength * 2];
System.arraycopy(compressed, 0, buffer, 0, lastBytesLength);
compressed = buffer;
}
int readLength = source.read(compressed, lastBytesLength, compressed.length - lastBytesLength);
if (readLength < 0)
{
break;
}
stats.readBytes(readLength);
lastBytesLength += readLength;
}
chunk.setLength(lastBytesLength - 4);
// We inadvertently also read the checksum; we need to grab it from the end of the buffer
checkSumFromChunk = ByteBufferUtil.toInt(ByteBuffer.wrap(compressed, lastBytesLength - 4, 4));
}
validBufferBytes = metadata.compressor().uncompress(compressed, 0, chunk.length, buffer, 0);
stats.decompressedBytes(chunk.length, validBufferBytes);
if (crcChance > ThreadLocalRandom.current().nextDouble())
{
checksum.update(compressed, 0, chunk.length);
if (checkSumFromChunk != (int) checksum.getValue())
{
throw new ChunkCorruptException("bad chunk " + chunk);
}
// Reset checksum object back to the original (blank) state
checksum.reset();
}
currentCompressed += chunk.length + checksumBytes.length;
alignBufferOffset();
}