in stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java [150:255]
protected List<UUID> getQueueRange( UUID queueId, QueueBounds bounds, SearchParam params ) {
if ( bounds == null ) {
logger.error( "Necessary queue bounds not found" );
throw new QueueException( "Necessary queue bounds not found" );
}
UUID finish_uuid = params.reversed ? bounds.getOldest() : bounds.getNewest();
List<UUID> results = new ArrayList<>( params.limit );
UUID start = params.startId;
if ( start == null ) {
start = params.reversed ? bounds.getNewest() : bounds.getOldest();
}
if ( start == null ) {
logger.error( "No first message in queue" );
return results;
}
if ( finish_uuid == null ) {
logger.error( "No last message in queue" );
return results;
}
long start_ts_shard = roundLong( getTimestampInMillis( start ), QUEUE_SHARD_INTERVAL );
long finish_ts_shard = roundLong( getTimestampInMillis( finish_uuid ), QUEUE_SHARD_INTERVAL );
long current_ts_shard = start_ts_shard;
if ( params.reversed ) {
current_ts_shard = finish_ts_shard;
}
final MessageIdComparator comparator = new MessageIdComparator( params.reversed );
//should be start < finish
if ( comparator.compare( start, finish_uuid ) > 0 ) {
logger.warn( "Tried to perform a slice with start UUID {} after finish UUID {}.", start, finish_uuid );
throw new IllegalArgumentException(
String.format( "You cannot specify a start value of %s after finish value of %s", start,
finish_uuid ) );
}
UUID lastValue = start;
boolean firstPage = true;
while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard )
&& comparator.compare( start, finish_uuid ) < 1 ) {
if( logger.isDebugEnabled() ) {
logger.debug("Starting search with start UUID {}, finish UUID {}, and reversed {}",
lastValue, finish_uuid, params.reversed);
}
SliceQuery<ByteBuffer, UUID, ByteBuffer> q = createSliceQuery( ko, be, ue, be );
q.setColumnFamily( QUEUE_INBOX.getColumnFamily() );
q.setKey( getQueueShardRowKey( queueId, current_ts_shard ) );
q.setRange( lastValue, finish_uuid, params.reversed, params.limit + 1 );
final List<HColumn<UUID, ByteBuffer>> cassResults = swallowOrderedExecution(q);
for ( int i = 0; i < cassResults.size(); i++ ) {
HColumn<UUID, ByteBuffer> column = cassResults.get( i );
final UUID columnName = column.getName();
// skip the first one, we've already read it
if ( i == 0 && ( firstPage && params.skipFirst && params.startId.equals( columnName ) ) || ( !firstPage
&& lastValue != null && lastValue.equals( columnName ) ) ) {
continue;
}
lastValue = columnName;
results.add( columnName );
if (logger.isDebugEnabled()) {
logger.debug("Added id '{}' to result set for queue id '{}'", start, queueId);
}
if ( results.size() >= params.limit ) {
return results;
}
firstPage = false;
}
if ( params.reversed ) {
current_ts_shard -= QUEUE_SHARD_INTERVAL;
}
else {
current_ts_shard += QUEUE_SHARD_INTERVAL;
}
}
return results;
}