public PeekResult peek()

in src/main/java/com/awsblog/queueing/sdk/QueueSdkClient.java [527:668]


	public PeekResult peek() {

		Map<String,AttributeValue> exclusiveStartKey = null;

		PeekResult result = new PeekResult();

		Map<String,AttributeValue> values = new HashMap<>();
		values.put(":one", new AttributeValue().withN("1"));

		String selectedID = null;

		int selectedVersion = 0;
		boolean recordForPeekIsFound = false;

		do {

			QueryRequest queryRequest = new QueryRequest()
					.withProjectionExpression("id, queued, system_info")
					.withIndexName(Constants.QUEUEING_INDEX_NAME)
					.withTableName(this.actualTableName)
					.withKeyConditionExpression("queued = :one")
					//.withFilterExpression("attribute_not_exists(queue_selected)")   // we need to look for the stragglers
					.withLimit(250)
					.withScanIndexForward(true)
					.withExpressionAttributeValues(values);

			queryRequest.withExclusiveStartKey(exclusiveStartKey);

			QueryResult queryResult = this.dynamoDB.query(queryRequest);
			exclusiveStartKey = queryResult.getLastEvaluatedKey();
			
			for(Map<String,AttributeValue> itemMap : queryResult.getItems()) {

		        Map<String, AttributeValue> sysMap = itemMap.get("system_info").getM();

		        boolean isQueueSelected = false;
		    	if (sysMap.containsKey("queue_selected")) isQueueSelected = sysMap.get("queue_selected").getBOOL();
		    	
		        // check if there are no stragglers (marked to be in processing but actually orphan)
		        if (sysMap.containsKey("peek_utc_timestamp") && isQueueSelected) {
		        	
			        long currentTS = System.currentTimeMillis();
			        long lastPeekTimeUTC = Long.parseLong(sysMap.get("peek_utc_timestamp").getN());

			        // if more than VISIBILITY_TIMEOUT_IN_MINUTES
			        if (currentTS - lastPeekTimeUTC > (Constants.VISIBILITY_TIMEOUT_IN_MINUTES * 60 * 1000)) {

		        		selectedID = itemMap.get("id").getS();
				        selectedVersion = Integer.parseInt(sysMap.get("version").getN());
				        recordForPeekIsFound = true;
				        
						System.out.printf(" >> Converted struggler, Shipment ID: [%s], age: %d%n", itemMap.get("id").getS(), currentTS - lastPeekTimeUTC);
			        }
		        }
			        
				// otherwise, peek first record that satisfy basic condition (queued = :one)
				else {

	        		selectedID = itemMap.get("id").getS();
			        selectedVersion = Integer.parseInt(sysMap.get("version").getN());
			        recordForPeekIsFound = true;
		        }
			        
				// no need to go further
				if (recordForPeekIsFound) break;
			}

		} while (!recordForPeekIsFound && exclusiveStartKey != null);

		if (Utils.checkIfNullObject(selectedID)) {

			result.setReturnValue(ReturnStatusEnum.FAILED_EMPTY_QUEUE);
			return result;
		}

		// assign ID to 'result'
		result.setId(selectedID);
		
		// this is a simplest way to construct an App object
		Shipment shipment = this.get(selectedID);

		OffsetDateTime odt = OffsetDateTime.now(ZoneOffset.UTC);

		DynamoDB ddb = new DynamoDB(this.dynamoDB);
		Table table = ddb.getTable(this.actualTableName);

		long tsUTC = System.currentTimeMillis();

		UpdateItemOutcome outcome = null;
		
        try {

        	// IMPORTANT
        	// please note, we are not updating top-level attribute `last_updated_timestamp` in order to avoid re-indexing the order
        	
            UpdateItemSpec updateItemSpec = new UpdateItemSpec()
            	.withPrimaryKey("id", shipment.getId())
                .withUpdateExpression("ADD #sys.#v :one "
                		+ "SET #sys.queue_selected = :true, "
                		+ "#sys.last_updated_timestamp = :lut, "
                		+ "#sys.queue_peek_timestamp = :lut, "
                		+ "#sys.peek_utc_timestamp = :ts, #sys.#st = :st")
                .withNameMap(new NameMap()
                		.with("#v", "version")
                		.with("#st", "status")
                		.with("#sys", "system_info"))
                .withValueMap(
                        new ValueMap()
                        	.withInt(":one", 1)
                        	.withInt(":v", selectedVersion)
                        	.withBoolean(":true", true)
                        	.withLong(":ts", tsUTC)
                        	.withString(":st", StatusEnum.PROCESSING_SHIPMENT.toString())
                        	.withString(":lut", odt.toString()))
                .withConditionExpression("#sys.#v = :v")
                .withReturnValues(ReturnValue.ALL_NEW);

            outcome = table.updateItem(updateItemSpec);     
        }
        catch (Exception e) {
            System.err.println("peek() - failed to update multiple attributes in " + this.actualTableName);
            System.err.println(e.getMessage());

            result.setReturnValue(ReturnStatusEnum.FAILED_DYNAMO_ERROR);
    		return result;
        }
            
        result.setId(outcome.getItem().getString("id"));

        // adding this to get the fresh data from DDB
        Shipment peekedShipment = this.get(selectedID);
        result.setPeekedShipmentObject(peekedShipment);

        Map<String, Object> sysMap = outcome.getItem().getRawMap("system_info");
        result.setVersion(((BigDecimal)sysMap.get("version")).intValue());
        result.setLastUpdatedTimestamp((String)sysMap.get("last_updated_timestamp"));
        result.setStatus(StatusEnum.valueOf((String)sysMap.get("status")));            
        result.setTimestampMillisUTC(((BigDecimal)sysMap.get("peek_utc_timestamp")).intValue());

        result.setReturnValue(ReturnStatusEnum.SUCCESS);
		return result;
	}