harry-core/src/harry/model/QuiescentChecker.java (217 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package harry.model; import java.util.*; import java.util.function.Supplier; import harry.core.Run; import harry.data.ResultSetRow; import harry.ddl.ColumnSpec; import harry.ddl.SchemaSpec; import harry.model.sut.SystemUnderTest; import harry.reconciler.PartitionState; import harry.operations.Query; import harry.reconciler.Reconciler; import harry.runner.DataTracker; import static harry.generators.DataGenerators.NIL_DESCR; import static harry.generators.DataGenerators.UNSET_DESCR; public class QuiescentChecker implements Model { protected final OpSelectors.MonotonicClock clock; protected final DataTracker tracker; protected final SystemUnderTest sut; protected final Reconciler reconciler; protected final SchemaSpec schema; public QuiescentChecker(Run run) { this(run, new Reconciler(run)); } public QuiescentChecker(Run run, Reconciler reconciler) { this.clock = run.clock; this.sut = run.sut; this.reconciler = reconciler; this.tracker = run.tracker; this.schema = run.schemaSpec; } public void validate(Query query) { tracker.beginValidation(query.pd); validate(() -> SelectHelper.execute(sut, clock, query), query); tracker.endValidation(query.pd); } protected void validate(Supplier<List<ResultSetRow>> rowsSupplier, Query query) { List<ResultSetRow> actualRows = rowsSupplier.get(); PartitionState partitionState = reconciler.inflatePartitionState(query.pd, tracker, query); validate(schema, partitionState, actualRows, query); } public static void validate(SchemaSpec schema, PartitionState partitionState, List<ResultSetRow> actualRows, Query query) { Set<ColumnSpec<?>> columns = new HashSet<>(); columns.addAll(schema.allColumns); validate(schema, columns, partitionState, actualRows, query); } public static Reconciler.RowState adjustForSelection(Reconciler.RowState row, SchemaSpec schema, Set<ColumnSpec<?>> selection, boolean isStatic) { if (selection.size() == schema.allColumns.size()) return row; List<ColumnSpec<?>> columns = isStatic ? schema.staticColumns : schema.regularColumns; Reconciler.RowState newRowState = row.clone(); assert newRowState.vds.length == columns.size(); for (int i = 0; i < columns.size(); i++) { if (!selection.contains(columns.get(i))) { newRowState.vds[i] = UNSET_DESCR; newRowState.lts[i] = NO_TIMESTAMP; } } return newRowState; } public static void validate(SchemaSpec schema, Set<ColumnSpec<?>> selection, PartitionState partitionState, List<ResultSetRow> actualRows, Query query) { boolean isWildcardQuery = selection == null; if (isWildcardQuery) selection = new HashSet<>(schema.allColumns); Iterator<ResultSetRow> actual = actualRows.iterator(); Collection<Reconciler.RowState> expectedRows = partitionState.rows(query.reverse); Iterator<Reconciler.RowState> expected = expectedRows.iterator(); // It is possible that we only get a single row in response, and it is equal to static row if (partitionState.isEmpty() && partitionState.staticRow() != null && actual.hasNext()) { ResultSetRow actualRowState = actual.next(); if (actualRowState.cd != UNSET_DESCR && actualRowState.cd != partitionState.staticRow().cd) throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Found a row while model predicts statics only:" + "\nExpected: %s" + "\nActual: %s" + "\nQuery: %s", partitionState.staticRow().cd, actualRowState, query.toSelectStatement()); for (int i = 0; i < actualRowState.vds.length; i++) { if (actualRowState.vds[i] != NIL_DESCR || actualRowState.lts[i] != NO_TIMESTAMP) throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Found a row while model predicts statics only:" + "\nActual: %s" + "\nQuery: %s", actualRowState, query.toSelectStatement()); } assertStaticRow(partitionState, actualRows, adjustForSelection(partitionState.staticRow(), schema, selection, true), actualRowState, query, schema, isWildcardQuery); } while (actual.hasNext() && expected.hasNext()) { ResultSetRow actualRowState = actual.next(); Reconciler.RowState originalExpectedRowState = expected.next(); Reconciler.RowState expectedRowState = adjustForSelection(originalExpectedRowState, schema, selection, false); // TODO: this is not necessarily true. It can also be that ordering is incorrect. if (actualRowState.cd != UNSET_DESCR && actualRowState.cd != expectedRowState.cd) throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Found a row in the model that is not present in the resultset:" + "\nExpected: %s" + "\nActual: %s" + "\nQuery: %s", expectedRowState.toString(schema), actualRowState, query.toSelectStatement()); if (!Arrays.equals(actualRowState.vds, expectedRowState.vds)) throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Returned row state doesn't match the one predicted by the model:" + "\nExpected: %s (%s)" + "\nActual: %s (%s)." + "\nQuery: %s", descriptorsToString(expectedRowState.vds), expectedRowState.toString(schema), descriptorsToString(actualRowState.vds), actualRowState, query.toSelectStatement()); // Wildcard queries do not include timestamps if (!isWildcardQuery && !Arrays.equals(actualRowState.lts, expectedRowState.lts)) throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Timestamps in the row state don't match ones predicted by the model:" + "\nExpected: %s (%s)" + "\nActual: %s (%s)." + "\nQuery: %s" + "\nMax started: %d, Max finished: %d, %d reordered: %s", Arrays.toString(expectedRowState.lts), expectedRowState.toString(schema), Arrays.toString(actualRowState.lts), actualRowState, query.toSelectStatement()); if (partitionState.staticRow() != null || actualRowState.sds != null || actualRowState.slts != null) { Reconciler.RowState expectedStaticRowState = adjustForSelection(partitionState.staticRow(), schema, selection, true); assertStaticRow(partitionState, actualRows, expectedStaticRowState, actualRowState, query, schema, isWildcardQuery); } } if (actual.hasNext() || expected.hasNext()) { throw new ValidationException(partitionState.toString(schema), toString(actualRows), "Expected results to have the same number of results, but %s result iterator has more results." + "\nExpected: %s" + "\nActual: %s" + "\nQuery: %s", actual.hasNext() ? "actual" : "expected", expectedRows, actualRows, query.toSelectStatement()); } } public static void assertStaticRow(PartitionState partitionState, List<ResultSetRow> actualRows, Reconciler.RowState staticRow, ResultSetRow actualRowState, Query query, SchemaSpec schemaSpec, boolean isWildcardQuery) { if (!Arrays.equals(staticRow.vds, actualRowState.sds)) throw new ValidationException(partitionState.toString(schemaSpec), toString(actualRows), "Returned static row state doesn't match the one predicted by the model:" + "\nExpected: %s (%s)" + "\nActual: %s (%s)." + "\nQuery: %s", descriptorsToString(staticRow.vds), staticRow.toString(schemaSpec), descriptorsToString(actualRowState.sds), actualRowState, query.toSelectStatement()); if (!isWildcardQuery && !Arrays.equals(staticRow.lts, actualRowState.slts)) throw new ValidationException(partitionState.toString(schemaSpec), toString(actualRows), "Timestamps in the static row state don't match ones predicted by the model:" + "\nExpected: %s (%s)" + "\nActual: %s (%s)." + "\nQuery: %s", Arrays.toString(staticRow.lts), staticRow.toString(schemaSpec), Arrays.toString(actualRowState.slts), actualRowState, query.toSelectStatement()); } public static String descriptorsToString(long[] descriptors) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < descriptors.length; i++) { if (descriptors[i] == NIL_DESCR) sb.append("NIL"); if (descriptors[i] == UNSET_DESCR) sb.append("UNSET"); else sb.append(descriptors[i]); if (i > 0) sb.append(", "); } return sb.toString(); } public static String toString(Collection<Reconciler.RowState> collection, SchemaSpec schema) { StringBuilder builder = new StringBuilder(); for (Reconciler.RowState rowState : collection) builder.append(rowState.toString(schema)).append("\n"); return builder.toString(); } public static String toString(List<ResultSetRow> collection) { StringBuilder builder = new StringBuilder(); for (ResultSetRow rowState : collection) builder.append(rowState.toString()).append("\n"); return builder.toString(); } }