in src/main/java/com/awsblog/queueing/sdk/QueueSdkClient.java [154:215]
public QueueStats getQueueStats() {
int totalQueueSize = 0;
Map<String,AttributeValue> exclusiveStartKey = null;
Map<String,String> names = new HashMap<>();
names.put("#q", "queued");
Map<String,AttributeValue> values = new HashMap<>();
values.put(":one", new AttributeValue().withN("1"));
int peekedRecords = 0;
List<String> allQueueIDs = new ArrayList<>();
List<String> processingIDs = new ArrayList<>();
do {
QueryRequest queryRequest = new QueryRequest()
.withProjectionExpression("id, system_info")
.withIndexName(Constants.QUEUEING_INDEX_NAME)
.withTableName(this.actualTableName)
.withExpressionAttributeNames(names)
.withKeyConditionExpression("#q = :one")
.withScanIndexForward(true)
.withLimit(250)
.withExpressionAttributeValues(values);
queryRequest.withExclusiveStartKey(exclusiveStartKey);
QueryResult queryResult = this.dynamoDB.query(queryRequest);
exclusiveStartKey = queryResult.getLastEvaluatedKey();
for(Map<String,AttributeValue> itemMap : queryResult.getItems()) {
++totalQueueSize;
Map<String,AttributeValue> sysMap = itemMap.get("system_info").getM();
boolean isQueueSelected = false;
if (sysMap.containsKey("queue_selected")) isQueueSelected = sysMap.get("queue_selected").getBOOL();
if (isQueueSelected) {
++peekedRecords;
if (processingIDs.size() < 100) processingIDs.add(itemMap.get("id").getS());
}
if (allQueueIDs.size() < 100) allQueueIDs.add(itemMap.get("id").getS());
}
} while (exclusiveStartKey != null);
QueueStats result = new QueueStats();
result.setTotalRecordsInProcessing(peekedRecords);
result.setTotalRecordsInQueue(totalQueueSize);
result.setTotalRecordsNotStarted(totalQueueSize - peekedRecords);
if (Utils.checkIfNotNullAndNotEmptyCollection(allQueueIDs)) result.setFirst100IDsInQueue(allQueueIDs);
if (Utils.checkIfNotNullAndNotEmptyCollection(processingIDs)) result.setFirst100SelectedIDsInQueue(processingIDs);
return result;
}