in cassandra-four-zero/src/main/java/org/apache/cassandra/spark/reader/ReaderUtils.java [256:337]
static Map<MetadataType, MetadataComponent> deserializeStatsMetadata(InputStream is,
EnumSet<MetadataType> selectedTypes,
Descriptor descriptor) throws IOException
{
DataInputStream in = new DataInputPlus.DataInputStreamPlus(is);
boolean isChecksummed = descriptor.version.hasMetadataChecksum();
CRC32 crc = new CRC32();
int count = in.readInt();
updateChecksumInt(crc, count);
maybeValidateChecksum(crc, in, descriptor);
int[] ordinals = new int[count];
int[] offsets = new int[count];
int[] lengths = new int[count];
for (int index = 0; index < count; index++)
{
ordinals[index] = in.readInt();
updateChecksumInt(crc, ordinals[index]);
offsets[index] = in.readInt();
updateChecksumInt(crc, offsets[index]);
}
maybeValidateChecksum(crc, in, descriptor);
for (int index = 0; index < count - 1; index++)
{
lengths[index] = offsets[index + 1] - offsets[index];
}
MetadataType[] allMetadataTypes = MetadataType.values();
Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class);
for (int index = 0; index < count - 1; index++)
{
MetadataType type = allMetadataTypes[ordinals[index]];
if (!selectedTypes.contains(type))
{
in.skipBytes(lengths[index]);
continue;
}
byte[] bytes = new byte[isChecksummed ? lengths[index] - CHECKSUM_LENGTH : lengths[index]];
in.readFully(bytes);
crc.reset();
crc.update(bytes);
maybeValidateChecksum(crc, in, descriptor);
components.put(type, deserializeMetadataComponent(descriptor.version, bytes, type));
}
MetadataType type = allMetadataTypes[ordinals[count - 1]];
if (!selectedTypes.contains(type))
{
return components;
}
// We do not have in.bytesRemaining() (as in FileDataInput),
// so need to read remaining bytes to get final component
byte[] remainingBytes = ByteBufferUtils.readRemainingBytes(in, 256);
byte[] bytes;
if (descriptor.version.hasMetadataChecksum())
{
ByteBuffer buffer = ByteBuffer.wrap(remainingBytes);
int length = buffer.remaining() - 4;
bytes = new byte[length];
buffer.get(bytes, 0, length);
crc.reset();
crc.update(bytes);
validateChecksum(crc, buffer.getInt(), descriptor);
}
else
{
bytes = remainingBytes;
}
components.put(type, deserializeMetadataComponent(descriptor.version, bytes, type));
return components;
}