harry-core/src/harry/model/QuiescentChecker.java (217 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package harry.model;
import java.util.*;
import java.util.function.Supplier;
import harry.core.Run;
import harry.data.ResultSetRow;
import harry.ddl.ColumnSpec;
import harry.ddl.SchemaSpec;
import harry.model.sut.SystemUnderTest;
import harry.reconciler.PartitionState;
import harry.operations.Query;
import harry.reconciler.Reconciler;
import harry.runner.DataTracker;
import static harry.generators.DataGenerators.NIL_DESCR;
import static harry.generators.DataGenerators.UNSET_DESCR;
public class QuiescentChecker implements Model
{
protected final OpSelectors.MonotonicClock clock;
protected final DataTracker tracker;
protected final SystemUnderTest sut;
protected final Reconciler reconciler;
protected final SchemaSpec schema;
public QuiescentChecker(Run run)
{
this(run, new Reconciler(run));
}
public QuiescentChecker(Run run, Reconciler reconciler)
{
this.clock = run.clock;
this.sut = run.sut;
this.reconciler = reconciler;
this.tracker = run.tracker;
this.schema = run.schemaSpec;
}
public void validate(Query query)
{
tracker.beginValidation(query.pd);
validate(() -> SelectHelper.execute(sut, clock, query), query);
tracker.endValidation(query.pd);
}
protected void validate(Supplier<List<ResultSetRow>> rowsSupplier, Query query)
{
List<ResultSetRow> actualRows = rowsSupplier.get();
PartitionState partitionState = reconciler.inflatePartitionState(query.pd, tracker, query);
validate(schema, partitionState, actualRows, query);
}
public static void validate(SchemaSpec schema, PartitionState partitionState, List<ResultSetRow> actualRows, Query query)
{
Set<ColumnSpec<?>> columns = new HashSet<>();
columns.addAll(schema.allColumns);
validate(schema, columns, partitionState, actualRows, query);
}
public static Reconciler.RowState adjustForSelection(Reconciler.RowState row, SchemaSpec schema, Set<ColumnSpec<?>> selection, boolean isStatic)
{
if (selection.size() == schema.allColumns.size())
return row;
List<ColumnSpec<?>> columns = isStatic ? schema.staticColumns : schema.regularColumns;
Reconciler.RowState newRowState = row.clone();
assert newRowState.vds.length == columns.size();
for (int i = 0; i < columns.size(); i++)
{
if (!selection.contains(columns.get(i)))
{
newRowState.vds[i] = UNSET_DESCR;
newRowState.lts[i] = NO_TIMESTAMP;
}
}
return newRowState;
}
public static void validate(SchemaSpec schema, Set<ColumnSpec<?>> selection, PartitionState partitionState, List<ResultSetRow> actualRows, Query query)
{
boolean isWildcardQuery = selection == null;
if (isWildcardQuery)
selection = new HashSet<>(schema.allColumns);
Iterator<ResultSetRow> actual = actualRows.iterator();
Collection<Reconciler.RowState> expectedRows = partitionState.rows(query.reverse);
Iterator<Reconciler.RowState> expected = expectedRows.iterator();
// It is possible that we only get a single row in response, and it is equal to static row
if (partitionState.isEmpty() && partitionState.staticRow() != null && actual.hasNext())
{
ResultSetRow actualRowState = actual.next();
if (actualRowState.cd != UNSET_DESCR && actualRowState.cd != partitionState.staticRow().cd)
throw new ValidationException(partitionState.toString(schema),
toString(actualRows),
"Found a row while model predicts statics only:" +
"\nExpected: %s" +
"\nActual: %s" +
"\nQuery: %s",
partitionState.staticRow().cd,
actualRowState, query.toSelectStatement());
for (int i = 0; i < actualRowState.vds.length; i++)
{
if (actualRowState.vds[i] != NIL_DESCR || actualRowState.lts[i] != NO_TIMESTAMP)
throw new ValidationException(partitionState.toString(schema),
toString(actualRows),
"Found a row while model predicts statics only:" +
"\nActual: %s" +
"\nQuery: %s",
actualRowState, query.toSelectStatement());
}
assertStaticRow(partitionState, actualRows,
adjustForSelection(partitionState.staticRow(), schema, selection, true),
actualRowState, query, schema, isWildcardQuery);
}
while (actual.hasNext() && expected.hasNext())
{
ResultSetRow actualRowState = actual.next();
Reconciler.RowState originalExpectedRowState = expected.next();
Reconciler.RowState expectedRowState = adjustForSelection(originalExpectedRowState, schema, selection, false);
// TODO: this is not necessarily true. It can also be that ordering is incorrect.
if (actualRowState.cd != UNSET_DESCR && actualRowState.cd != expectedRowState.cd)
throw new ValidationException(partitionState.toString(schema),
toString(actualRows),
"Found a row in the model that is not present in the resultset:" +
"\nExpected: %s" +
"\nActual: %s" +
"\nQuery: %s",
expectedRowState.toString(schema),
actualRowState, query.toSelectStatement());
if (!Arrays.equals(actualRowState.vds, expectedRowState.vds))
throw new ValidationException(partitionState.toString(schema),
toString(actualRows),
"Returned row state doesn't match the one predicted by the model:" +
"\nExpected: %s (%s)" +
"\nActual: %s (%s)." +
"\nQuery: %s",
descriptorsToString(expectedRowState.vds), expectedRowState.toString(schema),
descriptorsToString(actualRowState.vds), actualRowState,
query.toSelectStatement());
// Wildcard queries do not include timestamps
if (!isWildcardQuery && !Arrays.equals(actualRowState.lts, expectedRowState.lts))
throw new ValidationException(partitionState.toString(schema),
toString(actualRows),
"Timestamps in the row state don't match ones predicted by the model:" +
"\nExpected: %s (%s)" +
"\nActual: %s (%s)." +
"\nQuery: %s" +
"\nMax started: %d, Max finished: %d, %d reordered: %s",
Arrays.toString(expectedRowState.lts), expectedRowState.toString(schema),
Arrays.toString(actualRowState.lts), actualRowState,
query.toSelectStatement());
if (partitionState.staticRow() != null || actualRowState.sds != null || actualRowState.slts != null)
{
Reconciler.RowState expectedStaticRowState = adjustForSelection(partitionState.staticRow(), schema, selection, true);
assertStaticRow(partitionState, actualRows, expectedStaticRowState, actualRowState, query, schema, isWildcardQuery);
}
}
if (actual.hasNext() || expected.hasNext())
{
throw new ValidationException(partitionState.toString(schema),
toString(actualRows),
"Expected results to have the same number of results, but %s result iterator has more results." +
"\nExpected: %s" +
"\nActual: %s" +
"\nQuery: %s",
actual.hasNext() ? "actual" : "expected",
expectedRows,
actualRows,
query.toSelectStatement());
}
}
public static void assertStaticRow(PartitionState partitionState,
List<ResultSetRow> actualRows,
Reconciler.RowState staticRow,
ResultSetRow actualRowState,
Query query,
SchemaSpec schemaSpec,
boolean isWildcardQuery)
{
if (!Arrays.equals(staticRow.vds, actualRowState.sds))
throw new ValidationException(partitionState.toString(schemaSpec),
toString(actualRows),
"Returned static row state doesn't match the one predicted by the model:" +
"\nExpected: %s (%s)" +
"\nActual: %s (%s)." +
"\nQuery: %s",
descriptorsToString(staticRow.vds), staticRow.toString(schemaSpec),
descriptorsToString(actualRowState.sds), actualRowState,
query.toSelectStatement());
if (!isWildcardQuery && !Arrays.equals(staticRow.lts, actualRowState.slts))
throw new ValidationException(partitionState.toString(schemaSpec),
toString(actualRows),
"Timestamps in the static row state don't match ones predicted by the model:" +
"\nExpected: %s (%s)" +
"\nActual: %s (%s)." +
"\nQuery: %s",
Arrays.toString(staticRow.lts), staticRow.toString(schemaSpec),
Arrays.toString(actualRowState.slts), actualRowState,
query.toSelectStatement());
}
public static String descriptorsToString(long[] descriptors)
{
StringBuilder sb = new StringBuilder();
for (int i = 0; i < descriptors.length; i++)
{
if (descriptors[i] == NIL_DESCR)
sb.append("NIL");
if (descriptors[i] == UNSET_DESCR)
sb.append("UNSET");
else
sb.append(descriptors[i]);
if (i > 0)
sb.append(", ");
}
return sb.toString();
}
public static String toString(Collection<Reconciler.RowState> collection, SchemaSpec schema)
{
StringBuilder builder = new StringBuilder();
for (Reconciler.RowState rowState : collection)
builder.append(rowState.toString(schema)).append("\n");
return builder.toString();
}
public static String toString(List<ResultSetRow> collection)
{
StringBuilder builder = new StringBuilder();
for (ResultSetRow rowState : collection)
builder.append(rowState.toString()).append("\n");
return builder.toString();
}
}