in src/main/java/com/uber/rss/tools/SerializerBenchmark.java [81:224]
public void run(int numIterations) {
ClassTag<HashMap<String, String>> classTag = scala.reflect.ClassTag$.MODULE$.apply(HashMap.class);
long startTime = System.nanoTime();
long serializedTotalBytes = 0;
for (int i = 0; i < numIterations; i++) {
byteArrayOutputStream.reset();
// serialize objects and write to output stream
for (HashMap<String, String> v : testObjects) {
// serialize object
ByteBuffer byteBuffer = serializer.serialize(v, classTag);
// write object size to output stream
int objectSize = byteBuffer.remaining();
int byte1 = objectSize & 0xFF;
int byte2 = (objectSize >> 8) & 0xFF;
int byte3 = (objectSize >> 8*2) & 0xFF;
int byte4 = (objectSize >> 8*3) & 0xFF;
byteArrayOutputStream.write(byte1);
byteArrayOutputStream.write(byte2);
byteArrayOutputStream.write(byte3);
byteArrayOutputStream.write(byte4);
// write serialized bytes to output stream
while (byteBuffer.remaining() > 0) {
int num = Math.min(byteBuffer.remaining(), bytesBuffer.length);
byteBuffer.get(bytesBuffer, 0, num);
byteArrayOutputStream.write(bytesBuffer, 0, num);
}
}
try {
byteArrayOutputStream.flush();
byteArrayOutputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
// read objects and deserialize
byte[] streamBytes = byteArrayOutputStream.toByteArray();
serializedTotalBytes += streamBytes.length;
int numReadObjects = 0;
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(streamBytes);
int byte1 = byteArrayInputStream.read();
while (byte1 != -1) {
int byte2 = byteArrayInputStream.read();
int byte3 = byteArrayInputStream.read();
int byte4 = byteArrayInputStream.read();
int objectSize = byte1 | byte2 << 8 | byte3 << 8*2 | byte4 << 8*3;
if (bytesBuffer.length < objectSize) {
throw new RuntimeException("Too small byte array size");
}
byteArrayInputStream.read(bytesBuffer, 0, objectSize);
HashMap<String, String> object = serializer.deserialize(ByteBuffer.wrap(bytesBuffer), classTag);
if (object == null) {
throw new RuntimeException("Got null after deserialize");
}
numReadObjects ++;
byte1 = byteArrayInputStream.read();
}
try {
byteArrayInputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
if (numReadObjects != testObjects.size()) {
throw new RuntimeException("Invalid number of objects");
}
}
long singleSerializeTotalTime = System.nanoTime() - startTime;
long singleSerializeTotalBytes = serializedTotalBytes;
startTime = System.nanoTime();
serializedTotalBytes = 0;
for (int i = 0; i < numIterations; i++) {
byteArrayOutputStream.reset();
// create serialize stream and write objects into it
SerializationStream serializationStream = serializer.serializeStream(byteArrayOutputStream);
for (HashMap<String, String> v : testObjects) {
serializationStream.writeObject(v, classTag);
}
serializationStream.flush();
serializationStream.close();
byte[] streamBytes = byteArrayOutputStream.toByteArray();
serializedTotalBytes += streamBytes.length;
// create deserialize stream and read objects from it
int numReadObjects = 0;
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(streamBytes);
DeserializationStream deserializationStream = serializer.deserializeStream(byteArrayInputStream);
HashMap<String, String> object = deserializationStream.readObject(classTag);
while (object != null) {
numReadObjects++;
try {
object = deserializationStream.readObject(classTag);
} catch (Throwable ex) {
if (ex instanceof EOFException) {
break;
} else {
throw ex;
}
}
}
try {
byteArrayInputStream.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
if (numReadObjects != testObjects.size()) {
throw new RuntimeException("Invalid number of objects");
}
}
long streamSerializeTotalTime = System.nanoTime() - startTime;
long streamSerializeTotalBytes = serializedTotalBytes;
logger.info(String.format("Number of objects: %s, single serialize total seconds: %s (%s bytes), stream serialize total seconds: %s (%s bytes)",
testObjects.size(),
TimeUnit.NANOSECONDS.toSeconds(singleSerializeTotalTime),
singleSerializeTotalBytes,
TimeUnit.NANOSECONDS.toSeconds(streamSerializeTotalTime),
streamSerializeTotalBytes));
}