in stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowShardColumnIterator.java [181:441]
public void advance() {
if (logger.isTraceEnabled()) logger.trace( "Advancing multi row column iterator" );
/**
* If the edge is present, we need to being seeking from this
*/
final boolean skipFirstColumn = startColumn != null;
final int selectSize = skipFirstColumn ? pageSize + 1 : pageSize;
final RangeBuilder rangeBuilder = new RangeBuilder();
SmartShard startShard = null;
if(currentShardIterator == null){
// create a copy that we use to search for our 'starting shard'
final List<SmartShard> shards = new ArrayList<>(rowKeysWithShardEnd);
// flip the order of our shards if ascending
if(ascending){
Collections.reverse(rowKeysWithShardEnd);
}
if(lastTimestamp.isPresent()) {
//always seek from 0 to find out where our cursor last should fall
Collections.reverse(shards);
for ( SmartShard shard : shards){
if ( lastTimestamp.get().compareTo(shard.getShardIndex()) > 0) {
startShard = shard;
}
}
}
currentShardIterator = rowKeysWithShardEnd.iterator();
}
if(currentShard == null){
if(logger.isTraceEnabled()){
logger.trace("currentShard: {}", currentShard);
}
currentShard = currentShardIterator.next();
if (startShard != null){
while(!currentShard.equals(startShard)){
currentShard = currentShardIterator.next();
}
}
// skip over shards that are marked deleted
while ( currentShard.isDeleted() && currentShardIterator.hasNext() ){
if(logger.isTraceEnabled()){
logger.trace("Shard is marked deleted - {}", currentShard);
}
currentShard = currentShardIterator.next();
}
if(logger.isTraceEnabled()){
logger.trace("all shards when starting: {}", rowKeysWithShardEnd);
logger.trace("initializing iterator with shard: {}", currentShard);
}
}
// initial request, build the range with no start and no end
if ( startColumn == null && currentShard.getShardEnd() == null ){
columnSearch.buildRange( rangeBuilder );
if(logger.isTraceEnabled()){
logger.trace("initial search (no start or shard end)");
}
}
// if there's only a startColumn set the range start startColumn always
else if ( startColumn != null && currentShard.getShardEnd() == null ){
columnSearch.buildRange( rangeBuilder, startColumn, null );
if(logger.isTraceEnabled()){
logger.trace("search (no shard end) with start: {}", startColumn);
}
}
// if there's only a shardEnd, set the start/end according based on the search order
else if ( startColumn == null && currentShard.getShardEnd() != null ){
T shardEnd = (T) currentShard.getShardEnd();
// if we have a shardEnd and it's not an ascending search, use the shardEnd as a start
if(!ascending) {
columnSearch.buildRange(rangeBuilder, shardEnd, null);
if(logger.isTraceEnabled()){
logger.trace("search descending with start: {}", shardEnd);
}
}
// if we have a shardEnd and it is an ascending search, use the shardEnd as the end
else{
columnSearch.buildRange( rangeBuilder, null, shardEnd );
if(logger.isTraceEnabled()){
logger.trace("search ascending with end: {}", shardEnd);
}
}
}
// if there's both a startColumn and a shardEnd, decide which should be used as start/end based on search order
else if ( startColumn != null && currentShard.getShardEnd() != null) {
T shardEnd = (T) currentShard.getShardEnd();
// if the search is not ascending, set the start to be the older edge
if(!ascending){
T searchStart = comparator.compare(shardEnd, startColumn) > 0 ? shardEnd : startColumn;
columnSearch.buildRange( rangeBuilder, searchStart, null);
if(logger.isTraceEnabled()){
logger.trace("search descending with start: {} in shard", searchStart, currentShard);
}
}
// if the search is ascending, then always use the startColumn for the start and shardEnd for the range end
else{
columnSearch.buildRange( rangeBuilder, startColumn , shardEnd);
if(logger.isTraceEnabled()){
logger.trace("search with start: {}, end: {}", startColumn, shardEnd);
}
}
}
rangeBuilder.setLimit( selectSize );
if (logger.isTraceEnabled()) logger.trace( "Executing cassandra query with shard {}", currentShard );
/**
* Get our list of slices
*/
final RowSliceQuery<R, C> query =
keyspace.prepareQuery( cf ).setConsistencyLevel( consistencyLevel ).getKeySlice( (R) currentShard.getRowKey() )
.withColumnRange( rangeBuilder.build() );
final Rows<R, C> result;
try {
result = query.execute().getResult();
}
catch ( ConnectionException e ) {
throw new RuntimeException( "Unable to connect to casandra", e );
}
final List<T> mergedResults;
skipSize = 0;
mergedResults = processResults( result, selectSize );
if(logger.isTraceEnabled()){
logger.trace("skipped amount: {}", skipSize);
}
final int size = mergedResults.size();
if(logger.isTraceEnabled()){
logger.trace("current shard: {}, retrieved size: {}", currentShard, size);
logger.trace("selectSize={}, size={}, ", selectSize, size);
}
moreToReturn = size == selectSize;
if(selectSize == 1001 && mergedResults.size() == 1000){
moreToReturn = true;
}
// if a whole page is skipped OR the result size equals the the difference of what's skipped,
// it is likely during a shard transition and we should assume there is more to read
if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize - skipSize || size == (selectSize -1) - skipSize ){
moreToReturn = true;
}
//we have a first column to to check
if( size > 0) {
final T firstResult = mergedResults.get( 0 );
//The search has either told us to skip the first element, or it matches our last, therefore we disregard it
if(columnSearch.skipFirst( firstResult ) || (skipFirstColumn && comparator.compare( startColumn, firstResult ) == 0)){
if(logger.isTraceEnabled()){
logger.trace("removing an entry");
}
mergedResults.remove( 0 );
}
}
// set the start column for the enxt query
if(moreToReturn && mergedResults.size() > 0){
startColumn = mergedResults.get( mergedResults.size() - 1 );
}
currentColumnIterator = mergedResults.iterator();
//force an advance of this iterator when there are still shards to read but result set on current shard is 0
if(size == 0 && currentShardIterator.hasNext()){
hasNext();
}
if(logger.isTraceEnabled()){
logger.trace("currentColumnIterator.hasNext()={}, " +
"moreToReturn={}, currentShardIterator.hasNext()={}",
currentColumnIterator.hasNext(), moreToReturn, currentShardIterator.hasNext());
}
}