protected List getQueueRange()

in stack/core/src/main/java/org/apache/usergrid/mq/cassandra/io/AbstractSearch.java [150:255]


    protected List<UUID> getQueueRange( UUID queueId, QueueBounds bounds, SearchParam params ) {

        if ( bounds == null ) {
            logger.error( "Necessary queue bounds not found" );
            throw new QueueException( "Necessary queue bounds not found" );
        }

        UUID finish_uuid = params.reversed ? bounds.getOldest() : bounds.getNewest();

        List<UUID> results = new ArrayList<>( params.limit );

        UUID start = params.startId;

        if ( start == null ) {
            start = params.reversed ? bounds.getNewest() : bounds.getOldest();
        }

        if ( start == null ) {
            logger.error( "No first message in queue" );
            return results;
        }

        if ( finish_uuid == null ) {
            logger.error( "No last message in queue" );
            return results;
        }

        long start_ts_shard = roundLong( getTimestampInMillis( start ), QUEUE_SHARD_INTERVAL );

        long finish_ts_shard = roundLong( getTimestampInMillis( finish_uuid ), QUEUE_SHARD_INTERVAL );

        long current_ts_shard = start_ts_shard;

        if ( params.reversed ) {
            current_ts_shard = finish_ts_shard;
        }

        final MessageIdComparator comparator = new MessageIdComparator( params.reversed );


        //should be start < finish
        if ( comparator.compare( start, finish_uuid ) > 0 ) {
            logger.warn( "Tried to perform a slice with start UUID {} after finish UUID {}.", start, finish_uuid );
            throw new IllegalArgumentException(
                    String.format( "You cannot specify a start value of %s after finish value of %s", start,
                            finish_uuid ) );
        }


        UUID lastValue = start;
        boolean firstPage = true;

        while ( ( current_ts_shard >= start_ts_shard ) && ( current_ts_shard <= finish_ts_shard )
                && comparator.compare( start, finish_uuid ) < 1 ) {

            if( logger.isDebugEnabled() ) {
                logger.debug("Starting search with start UUID {}, finish UUID {}, and reversed {}",
                    lastValue, finish_uuid, params.reversed);
            }


            SliceQuery<ByteBuffer, UUID, ByteBuffer> q = createSliceQuery( ko, be, ue, be );
            q.setColumnFamily( QUEUE_INBOX.getColumnFamily() );
            q.setKey( getQueueShardRowKey( queueId, current_ts_shard ) );
            q.setRange( lastValue, finish_uuid, params.reversed, params.limit + 1 );

            final List<HColumn<UUID, ByteBuffer>> cassResults = swallowOrderedExecution(q);


            for ( int i = 0; i < cassResults.size(); i++ ) {
                HColumn<UUID, ByteBuffer> column = cassResults.get( i );

                final UUID columnName = column.getName();

                // skip the first one, we've already read it
                if ( i == 0 && ( firstPage && params.skipFirst && params.startId.equals( columnName ) ) || ( !firstPage
                        && lastValue != null && lastValue.equals( columnName ) ) ) {
                    continue;
                }


                lastValue = columnName;

                results.add( columnName );

                if (logger.isDebugEnabled()) {
                    logger.debug("Added id '{}' to result set for queue id '{}'", start, queueId);
                }

                if ( results.size() >= params.limit ) {
                    return results;
                }

                firstPage = false;
            }

            if ( params.reversed ) {
                current_ts_shard -= QUEUE_SHARD_INTERVAL;
            }
            else {
                current_ts_shard += QUEUE_SHARD_INTERVAL;
            }
        }

        return results;
    }