in src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java [681:783]
public static List<RangeReadWithTarget> splitReadIntoAccordAndNormal(ClusterMetadata cm, PartitionRangeReadCommand read, ReadCoordinator readCoordinator, Dispatcher.RequestTime requestTime)
{
if (!readCoordinator.isEventuallyConsistent())
return ImmutableList.of(new RangeReadWithTarget(read, RangeReadTarget.normal));
TableMetadata tm = getTableMetadata(cm, read.metadata().id);
if (tm == null || (!tm.params.transactionalMode.nonSerialReadsThroughAccord && !tm.params.transactionalMigrationFrom.nonSerialReadsThroughAccord()))
return ImmutableList.of(new RangeReadWithTarget(read, RangeReadTarget.normal));
List<RangeReadWithTarget> result = null;
TransactionalMode transactionalMode = tm.params.transactionalMode;
TransactionalMigrationFromMode transactionalMigrationFromMode = tm.params.transactionalMigrationFrom;
boolean transactionalModeReadsThroughAccord = transactionalMode.nonSerialReadsThroughAccord;
RangeReadTarget migrationToTarget = transactionalModeReadsThroughAccord ? RangeReadTarget.accord : RangeReadTarget.normal;
boolean migrationFromReadsThroughAccord = transactionalMigrationFromMode.nonSerialReadsThroughAccord();
RangeReadTarget migrationFromTarget = migrationFromReadsThroughAccord ? RangeReadTarget.accord : RangeReadTarget.normal;
TableMigrationState tms = cm.consensusMigrationState.tableStates.get(tm.id);
if (tms == null)
{
if (transactionalMigrationFromMode == TransactionalMigrationFromMode.none)
// There is no migration and no TMS so do what the schema says since no migration should be required
return ImmutableList.of(new RangeReadWithTarget(read, transactionalModeReadsThroughAccord ? RangeReadTarget.accord : RangeReadTarget.normal));
else
// If we are migrating from something and there is no migration state the migration hasn't begun
// so continue to do what we are migrating from does until the range is marked as migrating
return ImmutableList.of(new RangeReadWithTarget(read, migrationFromReadsThroughAccord ? RangeReadTarget.accord : RangeReadTarget.normal));
}
// AbstractBounds can potentially be left/right inclusive while Range used to track migration is only right inclusive
// The right way to tackle this seems to be to find the tokens that intersect the key range and then split until
// until nothing intersects
AbstractBounds<PartitionPosition> keyRange = read.dataRange().keyRange();
AbstractBounds<PartitionPosition> remainder = keyRange;
// Migrating to Accord we only read through Accord when the range is fully migrated, but migrating back
// we stop reading from Accord as soon as the range is marked migrating and do key migration on read
NormalizedRanges<Token> migratedRanges = transactionalModeReadsThroughAccord ? tms.migratedRanges : tms.migratingAndMigratedRanges;
// Add the preceding range if any
if (!migratedRanges.isEmpty())
{
Token firstMigratingToken = migratedRanges.get(0).left.getToken();
int leftCmp = keyRange.left.getToken().compareTo(firstMigratingToken);
int rightCmp = compareRightToken(keyRange.right.getToken(), firstMigratingToken);
if (leftCmp <= 0)
{
if (rightCmp <= 0)
return ImmutableList.of(new RangeReadWithTarget(read, migrationFromTarget));
AbstractBounds<PartitionPosition> precedingRange = keyRange.withNewRight(rightCmp <= 0 ? keyRange.right : firstMigratingToken.maxKeyBound());
// Could be an empty bound, it's fine to let a min KeyBound and max KeyBound through as that isn't empty
if (!precedingRange.left.equals(precedingRange.right))
{
result = new ArrayList<>();
result.add(new RangeReadWithTarget(read.forSubRange(precedingRange, true), migrationFromTarget));
}
}
}
boolean hadAccordReads = false;
for (Range<Token> r : migratedRanges)
{
Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> intersectionAndRemainder = Range.intersectionAndRemainder(remainder, r);
if (intersectionAndRemainder.left != null)
{
if (result == null)
result = new ArrayList<>();
PartitionRangeReadCommand subRead = read.forSubRange(intersectionAndRemainder.left, result.isEmpty() ? true : false);
result.add(new RangeReadWithTarget(subRead, migrationToTarget));
hadAccordReads = true;
}
remainder = intersectionAndRemainder.right;
if (remainder == null)
break;
}
if (remainder != null)
{
if (result != null)
result.add(new RangeReadWithTarget(read.forSubRange(remainder, false), migrationFromTarget));
else
return ImmutableList.of(new RangeReadWithTarget(read.forSubRange(remainder, true), migrationFromTarget));
}
checkState(result != null && !result.isEmpty(), "Shouldn't have null or empty result");
checkState(result.get(0).read.dataRange().startKey().equals(read.dataRange().startKey()), "Split reads should encompass entire range");
checkState(result.get(result.size() - 1).read.dataRange().stopKey().equals(read.dataRange().stopKey()), "Split reads should encompass entire range");
if (result.size() > 1)
{
for (int i = 0; i < result.size() - 1; i++)
{
checkState(result.get(i).read.dataRange().stopKey().equals(result.get(i + 1).read.dataRange().startKey()), "Split reads should all be adjacent");
checkState(result.get(i).target != result.get(i + 1).target, "Split reads should be for different targets");
}
}
//TODO (later): https://issues.apache.org/jira/browse/CASSANDRA-20211 Range reads could use a barrier
if (hadAccordReads)
{
// do barrier
}
return result;
}