public void advance()

in stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java [181:441]


    public void advance() {

        if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );

        /**
         * If the edge is present, we need to being seeking from this
         */

        final boolean skipFirstColumn = startColumn != null;

        final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;

        final RangeBuilder rangeBuilder = new RangeBuilder();

        SmartShard startShard = null;



        if(currentShardIterator == null){

            // create a copy that we use to search for our 'starting shard'
            final List<SmartShard> shards = new ArrayList<>(rowKeysWithShardEnd);


            // flip the order of our shards if ascending
            if(ascending){
                Collections.reverse(rowKeysWithShardEnd);
            }


            if(lastTimestamp.isPresent()) {

                //always seek from 0 to find out where our cursor last should fall
                Collections.reverse(shards);

                for ( SmartShard shard : shards){

                    if ( lastTimestamp.get().compareTo(shard.getShardIndex()) > 0) {
                        startShard = shard;
                    }

                }

            }

            currentShardIterator = rowKeysWithShardEnd.iterator();

        }

        if(currentShard == null){

            if(logger.isTraceEnabled()){
                logger.trace("currentShard: {}", currentShard);
            }

            currentShard = currentShardIterator.next();

            if (startShard != null){
                while(!currentShard.equals(startShard)){
                    currentShard = currentShardIterator.next();
                }
            }

            // skip over shards that are marked deleted
            while ( currentShard.isDeleted() && currentShardIterator.hasNext() ){

                if(logger.isTraceEnabled()){
                    logger.trace("Shard is marked deleted - {}", currentShard);
                }

                currentShard = currentShardIterator.next();
            }


            if(logger.isTraceEnabled()){
                logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
                logger.trace("initializing iterator with shard: {}", currentShard);
            }


        }



        // initial request, build the range with no start and no end
        if ( startColumn == null && currentShard.getShardEnd() == null ){

            columnSearch.buildRange( rangeBuilder );

            if(logger.isTraceEnabled()){
                logger.trace("initial search (no start or shard end)");
            }

        }
        // if there's only a startColumn set the range start startColumn always
        else if ( startColumn != null && currentShard.getShardEnd() == null ){

            columnSearch.buildRange( rangeBuilder, startColumn, null );

            if(logger.isTraceEnabled()){
                logger.trace("search (no shard end) with start: {}", startColumn);
            }

        }
        // if there's only a shardEnd, set the start/end according based on the search order
        else if ( startColumn == null && currentShard.getShardEnd() != null ){

            T shardEnd = (T) currentShard.getShardEnd();

            // if we have a shardEnd and it's not an ascending search, use the shardEnd as a start
            if(!ascending) {

                columnSearch.buildRange(rangeBuilder, shardEnd, null);

                if(logger.isTraceEnabled()){
                    logger.trace("search descending with start: {}", shardEnd);
                }

            }
            // if we have a shardEnd and it is an ascending search, use the shardEnd as the end
            else{

                columnSearch.buildRange( rangeBuilder, null, shardEnd );

                if(logger.isTraceEnabled()){
                    logger.trace("search ascending with end: {}", shardEnd);
                }

            }

        }
        // if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order
        else if ( startColumn != null && currentShard.getShardEnd() != null) {

            T shardEnd = (T) currentShard.getShardEnd();


            // if the search is not ascending, set the start to be the older edge
            if(!ascending){

                T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn;
                columnSearch.buildRange( rangeBuilder, searchStart, null);

                if(logger.isTraceEnabled()){
                    logger.trace("search descending with start: {} in shard", searchStart, currentShard);
                }

            }
            // if the search is ascending, then always use the startColumn for the start and shardEnd for the range end
            else{

                columnSearch.buildRange( rangeBuilder, startColumn , shardEnd);

                if(logger.isTraceEnabled()){
                    logger.trace("search with start: {}, end: {}", startColumn, shardEnd);
                }



            }

        }

        rangeBuilder.setLimit( selectSize );

        if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );

        /**
         * Get our list of slices
         */
        final RowSliceQuery<R, C> query =
            keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
                .withColumnRange( rangeBuilder.build() );

        final Rows<R, C> result;
        try {
            result = query.execute().getResult();
        }
        catch ( ConnectionException e ) {
            throw new RuntimeException( "Unable to connect to casandra", e );
        }




        final List<T> mergedResults;

        skipSize = 0;

        mergedResults = processResults( result, selectSize );

        if(logger.isTraceEnabled()){
            logger.trace("skipped amount: {}", skipSize);
        }



        final int size = mergedResults.size();



        if(logger.isTraceEnabled()){
            logger.trace("current shard: {}, retrieved size: {}", currentShard, size);
            logger.trace("selectSize={}, size={}, ", selectSize, size);


        }

        moreToReturn = size == selectSize;

        if(selectSize == 1001 && mergedResults.size() == 1000){
            moreToReturn = true;
        }


        // if a whole page is skipped OR the result size equals the the difference of what's skipped,
        // it is likely during a shard transition and we should assume there is more to read
        if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){
            moreToReturn = true;
        }

        //we have a first column to to check
        if( size > 0) {

            final T firstResult = mergedResults.get( 0 );

            //The search has either told us to skip the first element, or it matches our last, therefore we disregard it
            if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
                if(logger.isTraceEnabled()){
                    logger.trace("removing an entry");

                }
                mergedResults.remove( 0 );
            }

        }


        // set the start column for the enxt query
        if(moreToReturn && mergedResults.size() > 0){
            startColumn = mergedResults.get( mergedResults.size()  - 1 );

        }


        currentColumnIterator = mergedResults.iterator();


        //force an advance of this iterator when there are still shards to read but result set on current shard is 0
        if(size == 0 && currentShardIterator.hasNext()){
            hasNext();
        }

        if(logger.isTraceEnabled()){
            logger.trace("currentColumnIterator.hasNext()={}, " +
                    "moreToReturn={}, currentShardIterator.hasNext()={}",
                currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
        }


    }