in stack/core/src/main/java/org/apache/usergrid/mq/cassandra/QueueManagerImpl.java [786:865]
public List<AggregateCounterSet> getAggregateCounters( UUID queueId, CounterQuery query ) throws Exception {
CounterResolution resolution = query.getResolution();
if ( resolution == null ) {
resolution = CounterResolution.ALL;
}
long start = query.getStartTime() != null ? query.getStartTime() : 0;
long finish = query.getFinishTime() != null ? query.getFinishTime() : 0;
boolean pad = query.isPad();
if ( start <= 0 ) {
start = 0;
}
if ( ( finish <= 0 ) || ( finish < start ) ) {
finish = System.currentTimeMillis();
}
start = resolution.round( start );
finish = resolution.round( finish );
long expected_time = start;
if ( pad && ( resolution != CounterResolution.ALL ) ) {
long max_counters = ( finish - start ) / resolution.interval();
if ( max_counters > 1000 ) {
finish = resolution.round( start + ( resolution.interval() * 1000 ) );
}
}
List<CounterFilterPredicate> filters = query.getCounterFilters();
if ( filters == null ) {
return null;
}
Map<String, org.apache.usergrid.persistence.cassandra.CounterUtils.AggregateCounterSelection> selections =
new HashMap<String, AggregateCounterSelection>();
Keyspace ko = cass.getApplicationKeyspace( applicationId );
for ( CounterFilterPredicate filter : filters ) {
AggregateCounterSelection selection =
new AggregateCounterSelection( filter.getName(), null, null, queueId, filter.getCategory() );
selections.put( selection.getRow( resolution ), selection );
}
MultigetSliceCounterQuery<String, Long> q = HFactory.createMultigetSliceCounterQuery( ko, se, le );
q.setColumnFamily( APPLICATION_AGGREGATE_COUNTERS.getColumnFamily() );
q.setRange( start, finish, false, ALL_COUNT );
QueryResult<CounterRows<String, Long>> rows = q.setKeys( selections.keySet() ).execute();
List<AggregateCounterSet> countSets = new ArrayList<AggregateCounterSet>();
for ( CounterRow<String, Long> r : rows.get() ) {
expected_time = start;
List<AggregateCounter> counters = new ArrayList<AggregateCounter>();
for ( HCounterColumn<Long> column : r.getColumnSlice().getColumns() ) {
AggregateCounter count = new AggregateCounter( column.getName(), column.getValue() );
if ( pad && ( resolution != CounterResolution.ALL ) ) {
while ( count.getTimestamp() != expected_time ) {
counters.add( new AggregateCounter( expected_time, 0 ) );
expected_time = resolution.next( expected_time );
}
expected_time = resolution.next( expected_time );
}
counters.add( count );
}
if ( pad && ( resolution != CounterResolution.ALL ) ) {
while ( expected_time <= finish ) {
counters.add( new AggregateCounter( expected_time, 0 ) );
expected_time = resolution.next( expected_time );
}
}
AggregateCounterSelection selection = selections.get( r.getKey() );
countSets.add( new AggregateCounterSet( selection.getName(), queueId, selection.getCategory(), counters ) );
}
Collections.sort( countSets, new Comparator<AggregateCounterSet>() {
@Override
public int compare( AggregateCounterSet o1, AggregateCounterSet o2 ) {
String s1 = o1.getName();
String s2 = o2.getName();
return s1.compareTo( s2 );
}
} );
return countSets;
}