public static List splitReadIntoAccordAndNormal()

in src/java/org/apache/cassandra/service/consensus/migration/ConsensusRequestRouter.java [681:783]


    public static List<RangeReadWithTarget> splitReadIntoAccordAndNormal(ClusterMetadata cm, PartitionRangeReadCommand read, ReadCoordinator readCoordinator, Dispatcher.RequestTime requestTime)
    {
        if (!readCoordinator.isEventuallyConsistent())
            return ImmutableList.of(new RangeReadWithTarget(read, RangeReadTarget.normal));
        TableMetadata tm = getTableMetadata(cm, read.metadata().id);
        if (tm == null || (!tm.params.transactionalMode.nonSerialReadsThroughAccord && !tm.params.transactionalMigrationFrom.nonSerialReadsThroughAccord()))
            return ImmutableList.of(new RangeReadWithTarget(read, RangeReadTarget.normal));

        List<RangeReadWithTarget> result = null;
        TransactionalMode transactionalMode = tm.params.transactionalMode;
        TransactionalMigrationFromMode transactionalMigrationFromMode = tm.params.transactionalMigrationFrom;
        boolean transactionalModeReadsThroughAccord = transactionalMode.nonSerialReadsThroughAccord;
        RangeReadTarget migrationToTarget = transactionalModeReadsThroughAccord ? RangeReadTarget.accord : RangeReadTarget.normal;
        boolean migrationFromReadsThroughAccord = transactionalMigrationFromMode.nonSerialReadsThroughAccord();
        RangeReadTarget migrationFromTarget = migrationFromReadsThroughAccord ? RangeReadTarget.accord : RangeReadTarget.normal;
        TableMigrationState tms = cm.consensusMigrationState.tableStates.get(tm.id);
        if (tms == null)
        {
            if (transactionalMigrationFromMode == TransactionalMigrationFromMode.none)
                // There is no migration and no TMS so do what the schema says since no migration should be required
                return ImmutableList.of(new RangeReadWithTarget(read, transactionalModeReadsThroughAccord ? RangeReadTarget.accord : RangeReadTarget.normal));
            else
                // If we are migrating from something and there is no migration state the migration hasn't begun
                // so continue to do what we are migrating from does until the range is marked as migrating
                return ImmutableList.of(new RangeReadWithTarget(read, migrationFromReadsThroughAccord ? RangeReadTarget.accord : RangeReadTarget.normal));
        }


        // AbstractBounds can potentially be left/right inclusive while Range used to track migration is only right inclusive
        // The right way to tackle this seems to be to find the tokens that intersect the key range and then split until
        // until nothing intersects
        AbstractBounds<PartitionPosition> keyRange = read.dataRange().keyRange();
        AbstractBounds<PartitionPosition> remainder = keyRange;

        // Migrating to Accord we only read through Accord when the range is fully migrated, but migrating back
        // we stop reading from Accord as soon as the range is marked migrating and do key migration on read
        NormalizedRanges<Token> migratedRanges = transactionalModeReadsThroughAccord ? tms.migratedRanges : tms.migratingAndMigratedRanges;

        // Add the preceding range if any
        if (!migratedRanges.isEmpty())
        {
            Token firstMigratingToken = migratedRanges.get(0).left.getToken();
            int leftCmp = keyRange.left.getToken().compareTo(firstMigratingToken);
            int rightCmp = compareRightToken(keyRange.right.getToken(), firstMigratingToken);
            if (leftCmp <= 0)
            {
                if (rightCmp <= 0)
                    return ImmutableList.of(new RangeReadWithTarget(read, migrationFromTarget));
                AbstractBounds<PartitionPosition> precedingRange = keyRange.withNewRight(rightCmp <= 0 ? keyRange.right : firstMigratingToken.maxKeyBound());
                // Could be an empty bound, it's fine to let a min KeyBound and max KeyBound through as that isn't empty
                if (!precedingRange.left.equals(precedingRange.right))
                {
                    result = new ArrayList<>();
                    result.add(new RangeReadWithTarget(read.forSubRange(precedingRange, true), migrationFromTarget));
                }
            }
        }

        boolean hadAccordReads = false;
        for (Range<Token> r : migratedRanges)
        {
            Pair<AbstractBounds<PartitionPosition>, AbstractBounds<PartitionPosition>> intersectionAndRemainder = Range.intersectionAndRemainder(remainder, r);
            if (intersectionAndRemainder.left != null)
            {
                if (result == null)
                    result = new ArrayList<>();
                PartitionRangeReadCommand subRead = read.forSubRange(intersectionAndRemainder.left, result.isEmpty() ? true : false);
                result.add(new RangeReadWithTarget(subRead, migrationToTarget));
                hadAccordReads = true;
            }
            remainder = intersectionAndRemainder.right;
            if (remainder == null)
                break;
        }

        if (remainder != null)
        {
            if (result != null)
                result.add(new RangeReadWithTarget(read.forSubRange(remainder, false), migrationFromTarget));
            else
                return ImmutableList.of(new RangeReadWithTarget(read.forSubRange(remainder, true), migrationFromTarget));
        }

        checkState(result != null && !result.isEmpty(), "Shouldn't have null or empty result");
        checkState(result.get(0).read.dataRange().startKey().equals(read.dataRange().startKey()), "Split reads should encompass entire range");
        checkState(result.get(result.size() - 1).read.dataRange().stopKey().equals(read.dataRange().stopKey()), "Split reads should encompass entire range");
        if (result.size() > 1)
        {
            for (int i = 0; i < result.size() - 1; i++)
            {
                checkState(result.get(i).read.dataRange().stopKey().equals(result.get(i + 1).read.dataRange().startKey()), "Split reads should all be adjacent");
                checkState(result.get(i).target != result.get(i + 1).target, "Split reads should be for different targets");
            }
        }

        //TODO (later): https://issues.apache.org/jira/browse/CASSANDRA-20211 Range reads could use a barrier
        if (hadAccordReads)
        {
            // do barrier
        }

        return result;
    }