in src/main/java/com/awsblog/queueing/sdk/QueueSdkClient.java [527:668]
public PeekResult peek() {
Map<String,AttributeValue> exclusiveStartKey = null;
PeekResult result = new PeekResult();
Map<String,AttributeValue> values = new HashMap<>();
values.put(":one", new AttributeValue().withN("1"));
String selectedID = null;
int selectedVersion = 0;
boolean recordForPeekIsFound = false;
do {
QueryRequest queryRequest = new QueryRequest()
.withProjectionExpression("id, queued, system_info")
.withIndexName(Constants.QUEUEING_INDEX_NAME)
.withTableName(this.actualTableName)
.withKeyConditionExpression("queued = :one")
//.withFilterExpression("attribute_not_exists(queue_selected)") // we need to look for the stragglers
.withLimit(250)
.withScanIndexForward(true)
.withExpressionAttributeValues(values);
queryRequest.withExclusiveStartKey(exclusiveStartKey);
QueryResult queryResult = this.dynamoDB.query(queryRequest);
exclusiveStartKey = queryResult.getLastEvaluatedKey();
for(Map<String,AttributeValue> itemMap : queryResult.getItems()) {
Map<String, AttributeValue> sysMap = itemMap.get("system_info").getM();
boolean isQueueSelected = false;
if (sysMap.containsKey("queue_selected")) isQueueSelected = sysMap.get("queue_selected").getBOOL();
// check if there are no stragglers (marked to be in processing but actually orphan)
if (sysMap.containsKey("peek_utc_timestamp") && isQueueSelected) {
long currentTS = System.currentTimeMillis();
long lastPeekTimeUTC = Long.parseLong(sysMap.get("peek_utc_timestamp").getN());
// if more than VISIBILITY_TIMEOUT_IN_MINUTES
if (currentTS - lastPeekTimeUTC > (Constants.VISIBILITY_TIMEOUT_IN_MINUTES * 60 * 1000)) {
selectedID = itemMap.get("id").getS();
selectedVersion = Integer.parseInt(sysMap.get("version").getN());
recordForPeekIsFound = true;
System.out.printf(" >> Converted struggler, Shipment ID: [%s], age: %d%n", itemMap.get("id").getS(), currentTS - lastPeekTimeUTC);
}
}
// otherwise, peek first record that satisfy basic condition (queued = :one)
else {
selectedID = itemMap.get("id").getS();
selectedVersion = Integer.parseInt(sysMap.get("version").getN());
recordForPeekIsFound = true;
}
// no need to go further
if (recordForPeekIsFound) break;
}
} while (!recordForPeekIsFound && exclusiveStartKey != null);
if (Utils.checkIfNullObject(selectedID)) {
result.setReturnValue(ReturnStatusEnum.FAILED_EMPTY_QUEUE);
return result;
}
// assign ID to 'result'
result.setId(selectedID);
// this is a simplest way to construct an App object
Shipment shipment = this.get(selectedID);
OffsetDateTime odt = OffsetDateTime.now(ZoneOffset.UTC);
DynamoDB ddb = new DynamoDB(this.dynamoDB);
Table table = ddb.getTable(this.actualTableName);
long tsUTC = System.currentTimeMillis();
UpdateItemOutcome outcome = null;
try {
// IMPORTANT
// please note, we are not updating top-level attribute `last_updated_timestamp` in order to avoid re-indexing the order
UpdateItemSpec updateItemSpec = new UpdateItemSpec()
.withPrimaryKey("id", shipment.getId())
.withUpdateExpression("ADD #sys.#v :one "
+ "SET #sys.queue_selected = :true, "
+ "#sys.last_updated_timestamp = :lut, "
+ "#sys.queue_peek_timestamp = :lut, "
+ "#sys.peek_utc_timestamp = :ts, #sys.#st = :st")
.withNameMap(new NameMap()
.with("#v", "version")
.with("#st", "status")
.with("#sys", "system_info"))
.withValueMap(
new ValueMap()
.withInt(":one", 1)
.withInt(":v", selectedVersion)
.withBoolean(":true", true)
.withLong(":ts", tsUTC)
.withString(":st", StatusEnum.PROCESSING_SHIPMENT.toString())
.withString(":lut", odt.toString()))
.withConditionExpression("#sys.#v = :v")
.withReturnValues(ReturnValue.ALL_NEW);
outcome = table.updateItem(updateItemSpec);
}
catch (Exception e) {
System.err.println("peek() - failed to update multiple attributes in " + this.actualTableName);
System.err.println(e.getMessage());
result.setReturnValue(ReturnStatusEnum.FAILED_DYNAMO_ERROR);
return result;
}
result.setId(outcome.getItem().getString("id"));
// adding this to get the fresh data from DDB
Shipment peekedShipment = this.get(selectedID);
result.setPeekedShipmentObject(peekedShipment);
Map<String, Object> sysMap = outcome.getItem().getRawMap("system_info");
result.setVersion(((BigDecimal)sysMap.get("version")).intValue());
result.setLastUpdatedTimestamp((String)sysMap.get("last_updated_timestamp"));
result.setStatus(StatusEnum.valueOf((String)sysMap.get("status")));
result.setTimestampMillisUTC(((BigDecimal)sysMap.get("peek_utc_timestamp")).intValue());
result.setReturnValue(ReturnStatusEnum.SUCCESS);
return result;
}