site-content/source/modules/ROOT/pages/blog/Harry-an-Open-Source-Fuzz-Testing-and-Verification-Tool-for-Apache-Cassandra.adoc (445 lines of code) (raw):
= Harry, an Open Source Fuzz Testing and Verification Tool for Apache Cassandra
:page-layout: single-post
:page-role: blog-post
:page-post-date: November 18, 2021
:page-post-author: Alex Petrov
:description: The Apache Cassandra Community
:keywords:
Over the years working on Apache Cassandra while writing tests or
trying to reproduce the issues, I’ve always found myself repeating the
same procedure over and over again: creating schema, writing loops
generating data, then either manually reconciling it to check the
results, or comparing the result set against some predetermined
expected result. Not only is this approach tedious and time-consuming,
but it also does not scale: if some set of operations work for one
schema, there’s no way to know if it will also work for any arbitrary
schema, whether it will work if operations are executed in a different
order, or if operations themselves are slightly different.
While preparing Apache Cassandra for 4.0 release, we’ve made extensive
progress in how we test. The new in-tree in-JVM distributed test
framework enables us to easily write tests that exercise coordinated
query execution code paths while giving us flexibility and control
that was previously offered only by CQLTester, a tool for exercising
node-local query paths. Many subsystems were audited and covered with
tests. Cassandra users tried the new version out in their clusters and
reported their findings. All of these things are useful and important,
but we still needed a tool that would give us the same or higher
degree of confidence for every commit so that we could know that the
database is working as expected, not only for an exact set of
operations that exercised by unit and integration tests, but
potentially for any use-case and combination of operations under
circumstances comparable to production.
This all led us to develop Harry, a tool that can combine properties
of stress- and integration-testing tools. Harry is a tool that can
generate data for an arbitrary schema, execute data modification
queries against the cluster, track the progress of operation
execution, and make sure that responses to read queries are correct.
After reading this post, you will understand:
* how Harry generates the data
* how Harry performs verification
* which properties values generated by Harry make verification not only possible but also efficient.
The primary audience for this post is Cassandra contributors, so you
will need to be familiar with Apache Cassandra and its tooling.
== Fuzz testing
Since most of the bugs are reproduced by taking a sequence of actions
following some pattern, we need to specify what actions can be used to
lead to a given state. However, there’s a lot of flexibility regarding
which values exactly are written in specific columns.
For example, if we look at
https://issues.apache.org/jira/browse/CASSANDRA-16453[CASSANDRA-16453,window=_blank],
which was reproduced using Harry. Code to reproduce the issue with
in-JVM DTests looks something like this:
.Repro.java
[source,java]
----
try (Cluster cluster = init(builder().withNodes(2).start()))
{
cluster.schemaChange(withKeyspace("CREATE TABLE distributed_test_keyspace.table_0 (pk0 bigint,ck0 bigint,regular0 bigint,regular1 bigint,regular2 bigint, PRIMARY KEY (pk0, ck0)) WITH CLUSTERING ORDER BY (ck0 ASC);"));
cluster.coordinator(1).execute("DELETE FROM distributed_test_keyspace.table_0 USING TIMESTAMP 1 WHERE pk0=1 AND ck0>2;", ConsistencyLevel.ALL);
cluster.get(2).executeInternal("DELETE FROM distributed_test_keyspace.table_0 USING TIMESTAMP 1 WHERE pk0=1;");
cluster.coordinator(1).execute("SELECT * FROM distributed_test_keyspace.table_0 WHERE pk0=1 AND ck0>=1 AND ck0<3;",
ConsistencyLevel.ALL, 1L, 1L, 3L);
}
----
You can see that, at first glance, there are only three things that
that are important to reproduce the issue:
1. The table has to have at least one clustering column
2. Two actions are executed against the cluster: a range deletion, and a partition deletion
3. Both operations have the same timestamp
The rest of the details do not matter: size of the cluster, number of
replicas, clustering order, consistency level with which operations
are executed, types of clustering keys and values written, and so on.
The simplest way to cover a case like this with a test is to hardcode
the schema and then execute a partition deletion and a range deletion
hardcoding the values, much as we did above. This might work, but
there’s still a chance that the proposed fix may not work for some
other schema or some combination of values.
To improve the situation, we can express the test in more abstract
terms and, instead of writing a repro using specific statements, we
can only use the constraints we’ve specified above:
.HarryDsl.java
[source,java]
----
test(new SchemaGenerators.Builder("harry")
.partitionKeySpec(1, 5)
.clusteringKeySpec(1, 5)
.regularColumnSpec(1, 10)
.generator(),
historyBuilder -> {
historyBuilder.nextPartition()
.simultaneously()
.randomOrder()
.partitionDeletion()
.rangeDeletion()
.finish();
});
----
This spec can be used to generate clusters of different sizes,
configured with different schemas, executing the given sequence of
actions both in isolation and combined with other randomly generated
ones, with failure-injection. Best of all, this test will _not only_
ensure that such a sequence of actions does not produce an exception
but also ensures that a cluster will respond with correct results to
_any_ allowed read query.
== Generating data
Generating random values and sequences of actions and reconciling them
during verification is in itself not a difficult task. Making this
process time- and memory-efficient is what makes it more interesting.
For space efficiency, the log of actions generated using Harry is not
kept in memory or saved anywhere on disk since any generated operation
can be reproduced from its sequence number. In Harry, a sequence
number consists of two parts: the logical timestamp (LTS, which has
1-1 mapping to real-time timestamp), and the modification ID, which
allows having multiple uniquely identifiable operations for each
logical timestamp. For the sake of simplicity, we’ll just say that
each operation is represented by its sequence number / LTS.
In the example above, the operation order is determined by the seed
for the given run. Let’s say that partition deletion is executed
first. To produce a `DELETE` statement from it, we now need to
generate a partition key and get a timestamp. Similarly, to generate a
range deletion, we will need a partition key, two clustering keys that
will serve as lower and higher bounds for the range tombstone, and a
timestamp.
Using the sequence number and knowing the operation type, we can now
produce _descriptors_ that are used as the compact internal
representation of data in Harry. No matter how many parts it consists
of, any partition key is represented by a single `long`. The same is
true for the clustering keys: any clustering key, single-part or
composite, is represented using a single `long` descriptor. If we were
to generate an `INSERT` or `UPDATE` operation, each value for a
regular or a static column would have its own descriptor since we
would want to distinguish between two writes made by two different
operations.
To summarise, every operation has a sequence number, which determines
everything that is required to fully reproduce this operation,
including descriptors that we will later use to generate values
themselves:
* partition deletion only has a partition descriptor
* range deletion has a partition descriptor and two clustering descriptors, specifying tombstone bounds
* insert or update operation has a partition descriptor, a clustering descriptor, and a set of value descriptors, one for each regular and static column.
Using descriptors rather than specific values for verification can be
extremely useful for efficiency. Instead of comparing potentially
large values, we could just compare two longs that uniquely identify
them. This means that we have to have a way to not _only_ generate a
value from the descriptor, but _also_ to compute a descriptor the
value was generated from.
In Harry, we call such a generator `Bijection<T>`, and every bijection
can _inflate_ a descriptor into the value of type `T`. Then _deflate_
the value of type `T` back into the descriptor where it was originally
generated.
== Validating results
Applying a predetermined sequence of operations against a single
partition produces some partition state. Knowing the status of
execution of each operation, we can deterministically determine the
state of each node in the cluster and validate the results of
execution of any `SELECT` query.
Since we can represent any operation as a sequence of descriptors, we
know the order of operations (since the timestamp determines it). We
can assume we know the status of each operation (whether or not it has
been executed against some node), and we can deterministically produce
partition state for any given point in time. Partition state is
nothing but a sorted map, where the key is a clustering descriptor,
and value is a row state. Row state, in this case, holds value
descriptors for each column, and timestamps where operations were
executed:
.PartitionState.java
[source,java]
----
public class PartitionState implements Iterable<RowState> {
long partitionDescriptor;
NavigableMap<Long, RowState> rowStates;
}
public static class RowState {
long[] valueDescriptors;
long[] logicalTimestamps;
}
----
Similarly, since any value written to the database is generated using
a bijection, we can produce the partition state from the result set by
deflating every value returned by the database into the descriptor
that it was generated from.
== Generating Descriptors
Reproducible operation sequences can be generated from a set of rules
that determines what the sequence is going to look like. For example,
we can specify probability distributions for each operation type or
give operations relative weights, which can be turned into the
distribution internally later. Configuration for an insert / update /
delete workload with a probability of an insert operation (100/251)
being twice as high as a probability of a row deletion (50/251), and
ten times more probable than a partition deletion (1/251), would look
like:
----
INSERT: 100
UPDATE: 100
DELETE_ROW: 50
DELETE_PARTITION: 1
----
Since each operation is uniquely determined by its sequence number, we
can deterministically compute its operation type by taking these
probability distributions. One way to do this is by using PCG random
number generator, which has some useful properties we’re going to use
for generating our pseudorandom values.
If you’d like to learn more about the mathematical underpinnings of
PCG, you should read this paper
(https://www.pcg-random.org/pdf/hmc-cs-2014-0905.pdf[https://www.pcg-random.org/pdf/hmc-cs-2014-0905.pdf,window=_blank]). However,
to be able to use PCG, it is not necessary to know any of the
internals. We need a random number generator that will have the
following properties:
* Long period: sequence of numbers it produces does not repeat
frequently; ideally the period should be 2^64 when generating a
random number from 64 bits of entropy
* Stream selection: the ability to produce different random
sequences from the same seed, identified by some stream id.
* Addressability: any number produced by the generator can be
reproduced from the seed and its sequence number. Ideally, we’d
like to have methods such as `long randomNumber(long
sequenceNumber, long stream)` and `long sequenceNumber(long
randomNumber, long stream)`. In other words, we should be able to
determine the sequence number of the random number in the given
stream. Using this method, we can also determine `distance(long x,
long y)` : how many random numbers we should skip to get `y` after
seeing `x`.
* Walkability: the ability to produce a number immediately following
`long next(long randomNumber, long stream)` or preceding `long
prev(long randomNumber, long stream)` the given random number in
the random sequence.
You might have noticed that there are two ways to achieve the same
thing. We can get a pseudorandom number from some number known by the
system by using `randomNumber(i, stream)` and by using `prev(i,
stream)`. Both variants are valid, and both operations can be
inverted. We have a slight preference toward using `prev`, since its
inverse can be computed in constant time.
These properties allow us to reproduce partition state from just
configuration (i.e., known distributions, schema, size of the
partition, etc) and a seed:
* Partition descriptor for `N` th operation can be picked as `M` th
random number in the stream of partition descriptors, and the
relation between `N` and `M` is determined by the chosen pattern
for visiting partitions.
* Clustering descriptor for `N` th operation can be picked as `M` th
random number in the stream of clustering descriptors **for the
given partition**, where maximum `M` is determined by the maximum
partition size, so there can be no more than `max(M)` rows in any
generated partition.
One of the simplest useful ways to represent a pattern for picking a
descriptor from the sequence is to use a sliding window. The sliding
window begins with a preset number of items in it and allows to visit
each item in the current window one or several times in a round-robin
fashion. After this, it cycles one of the items out and adds a new one
in its place.
Once operation type, partition descriptor, and clustering descriptors
are determined, all we have left to cover is how to generate value
descriptors for `INSERT` and `UPDATE` operations. Value descriptor for
a column is uniquely identified by its sequence number and is bound by
partition descriptor, clustering descriptor, and column.
To summarise, all operations in Harry are deterministic and are
represented using their descriptors. Descriptors can be computed
hierarchically using the following rules:
* Partition descriptor is picked from the _stream_ of partition descriptors. Its position in that stream is determined by some rule (for example, a sliding window):
[source,java]
----
long pd = rng.randomNumber(positionFor(sequenceNumber), PARTITION_DESCRIPTOR_STREAM_ID)
----
* Clustering descriptor is picked from the _stream_ of clustering descriptors **for the given partition**.
[source,java]
----
long cd = rng.prev(positionInPartition, pd);
----
* Value descriptor is picked from the _stream_ of descriptors identified by which partition, clustering, and column the value belongs to:
[source,java]
----
long vd = rng.randomNumber(sequenceNumber, pd ^ cd ^ col);
----
== Inflation and Deflation
We’ve mentioned before that one reason Harry state is so compact and
can be validated so efficiently is because every value read from the
database can be traced back to the descriptor it was generated
from. To achieve this, we generate all values using order-preserving
bijections. In other words, for any value generated from a descriptor,
it should be possible to quickly find a descriptor this value was
generated from, and two values generated from two distinct descriptors
should sort the same as descriptors themselves.
Implementing an order-preserving bijection for 64-bit longs is trivial
and can be achieved by using an identity function. Essentially, any
long descriptor _is_ the value it represents:
[source,java]
----
long inflate(long descriptor) {
return descriptor;
}
long deflate(long value) {
return value;
}
----
There are many ways to make a bijection for strings. One of the ways
to do it is to have a set of 256 short strings of the same length in a
sorted array. When inflating a 64-bit long descriptor into the string,
we’ll be iterating over these 64 bits, taking 8 bits (one byte) at a
time, using the value of this byte as an index in an array of 256
strings.
[source,java]
----
String inflate(long descriptor) {
StringBuilder builder = new StringBuilder();
for (int i = 0; i < Long.BYTES; i++) {
int idx = getByte(descriptor, i);
builder.append(nibbles[idx]);
}
return builder.toString();
}
----
One thing we should take into account here is that strings are
compared byte-wise, while longs use signed comparison. To make sure
generated strings have the same order as descriptors, we need to XOR
the sign bit.
Since any two strings produced by this generator will be unique, and
we can produce at most 2^64 values using this generator, to generate
longer strings we do not even need larger nibbles. We can append
random data of arbitrary length to the end of the string. This does
not change the order since it is determined by the prefix generated
from nibbles that is unique to each value.
Such simple bijections can represent data types used for regular and
static columns. We’ve previously mentioned that partition and
clustering keys are also represented using 64-bit
descriptors. Partition and clustering keys are composite: they consist
of multiple distinct parts. One way to implement bijection for a
composite type is to “slice” 64 bits of entropy into smaller chunks,
each chunk giving some entropy to generate a different part of the
key. Each slice is then inflated using a bijection that corresponds to
the part of the key it represents. To convert the value back to the
descriptor, we must deflate each part of the key and then “stitch” the
values back together into a 64-bit descriptor.
To summarise, key generators are just bijections that can generate
multiple values for a single 64-bit descriptor instead of one. A
simplified and generalized version of such bijection may look
something like this:
[source,java]
----
Object[] inflate(long descriptor) {
long[] slices = slice(descriptor);
Object[] key = new Object[slices.length];
for (int i = 0; i < slices.length; i++) {
key[i] = children[i].inflate(slices[i]);
}
return key;
}
long deflate(Object[] value) {
long[] slices = new long[value.length];
for (int i = 0; i < value.length; i++) {
slices[i] = children[i].deflate(value[i]);
}
return stitch(slices);
}
----
Values generated by key generators preserve the order of descriptors
they were generated from, which allows efficiently checking the order
of results, comparing clustering descriptors, and validating range
deletions.
== Putting it all together
In this post, we’ve learned how the various parts of Harry work,
starting with how to reproduce a sequence of operations up to how the
values are generated. Using this information, we can create a
quiescent model checker that can validate the state of the database in
the absence of in-flight operations, assuming we know the state of all
operations before this moment.
As we’ve discussed, Harry is working with reproducible histories of
operations, where the following information identifies each operation:
[source,java]
----
class Operation {
long lts; // logical timestamp of the operation
long pd; // partition descriptor, derived from LTS
long cd; // clustering descriptor, derived from LTS and PD
long OperationKind; // operation type, derived from LTS and PD
}
----
Now, all we need to do is to produce a sequence of operations. For example, each operation with `INSERT` kind is going to be represented by:
[source,java]
----
class Insert {
long lts; // logical timestamp
long pd; // partition descriptor
long cd; // clustering descriptor
long[] vds; // value descriptors for each column
}
----
We can compile this information into an INSERT statement by inflating
partition, clustering, and value descriptors, and deriving the
real-time timestamp from the logical timestamp. This statement can
then be executed against the system under test (our Cassandra
cluster).
We know from the history of operations which partitions were visited,
and which operations were executed against them. To validate the state
of any given partition, all we need to do is to query the database to
retrieve partition state from the database, deflate every row returned
in the results: deflate all clustering keys into clustering
descriptors, and values into corresponding value descriptors,
producing internal PartitionState.
To verify that this partition state is also correct, we replay the
sequence of operations again. But instead of going all the way to the
generated INSERT statement, we operate with only descriptors and apply
operations sequentially to the PartitionState following usual
Cassandra reconciliation rules (i.e. last-write-wins, partition
tombstone > tombstone > write), compute logical timestamps and value
descriptors for each row.
Now, having two partition states: one “deflated” from the result set
returned by the system under test and one “inflated” from the logical
timestamp, we can directly compare them. If there are any
inconsistencies between the two sets, we can conclude there has been
an error.
Validating query results by reproducing the sequence of events is more
efficient than working with full operation logs holding all values for
every operation executed against the database state. This process can
be made even more efficient by validating rows individually while
having enough state in memory to validate a single row.
Since implementing other Cassandra features, such as partition
deletions, writes without primary key liveness, static columns, column
deletions, etc., do not require any additional information, and
follows the same rules, for the sake of brevity, it is not covered in
this post.
Quiescent checker is a very useful tool and can validate data sets
generated by sequential or concurrent runners, as long as the state of
each operation is known at the moment of validation. Since we simply
replay the sequence of events to reproduce the state, we can not have
any events whose state is unknown to the system.
== Closing words
Harry is a tool that allows the testing of databases in ways we
weren’t able to before. Instead of creating test cases expressed in
specific schemas and statements, it enables us to describe sequences
of events we’d like tested and generate schemas and possible
interleavings of statements corresponding to given
specifications. Creating exhaustive test suites can take a lot of
time, and we can have contributor creativity poured into patches and
features, not into test examples. This approach also allows testing
different features in combination and checking old behaviors in the
presence of new ones without explicitly creating new tests.
That said, integration testing is not always enough, and often we do
not know which areas of the codebase can be problematic. Harry can be
helpful here, too, since it will generate data, execute the workload
against the system and validate the results. Contributors can focus on
implementing test scenarios such as behavior in presence of unexpected
failures, cluster expansions, or node replacements.
Harry is a productivity tool that encapsulates the properties of a
stress test and a model checker, allowing us to find issues and
improve the quality of Cassandra in ways that were not possible
before.