public void run()

in src/main/java/com/amazonaws/services/kinesis/aggregators/datastore/DynamoQueryEngine.java [417:503]


		public void run() {
			List<Map<String, AttributeValue>> results = new ArrayList<>();

			for (int i = this.start; i < this.start + this.range; i++) {
				Condition c = new Condition().withComparisonOperator(
						ComparisonOperator.EQ).withAttributeValueList(
						new AttributeValue().withN("" + i));
				this.conditions
						.put(DynamoDataStore.SCATTER_PREFIX_ATTRIBUTE, c);
				QueryRequest req = new QueryRequest()
						.withIndexName(this.indexName)
						.withTableName(this.tableName)
						.withKeyConditions(this.conditions);

				Map<String, AttributeValue> lastKeyEvaluated = null;
				do {
					int queryAttempts = 0;
					QueryResult result = null;

					do {
						try {
							result = dynamoClient.query(req)
									.withLastEvaluatedKey(lastKeyEvaluated);

							results.addAll(result.getItems());
						} catch (ProvisionedThroughputExceededException e) {
							LOG.warn(String
									.format("Provisioned Throughput Exceeded - Retry Attempt %s",
											queryAttempts));

							try {
								Thread.sleep(2 ^ queryAttempts * BACKOFF_MILLIS);
							} catch (InterruptedException interrupted) {
								this.exception = interrupted;
								return;
							}
							queryAttempts++;
						}
					} while (queryAttempts < 10 && result == null);

					if (result == null) {
						this.exception = new Exception(String.format(
								"Unable to execute Query after %s attempts",
								queryAttempts));
						return;
					}

					lastKeyEvaluated = result.getLastEvaluatedKey();
				} while (lastKeyEvaluated != null);

				// pivot the results into a list of label values and set of date
				// values
				String labelValue = null;
				String dateValue = null;
				Set<String> values;

				for (Map<String, AttributeValue> map : results) {
					// process each attribute
					for (String s : map.keySet()) {
						// grab the label and date values
						if (s.equals(this.labelAttribute)) {
							labelValue = map.get(s).getS();
						} else if (s.equals(this.dateAttribute)) {
							dateValue = map.get(s).getS();
						}
					}

					if (labelValue != null && dateValue != null) {
						// get the current set of date values for the label, or
						// create a new one
						if (!resultKeys.containsKey(labelValue)) {
							values = new HashSet<>();
						} else {
							values = resultKeys.get(labelValue);
						}

						// add the current date value to the set of all date
						// values
						// fore label
						values.add(dateValue);

						// write back the map of label to date values
						resultKeys.put(labelValue, values);
					}
				}
			}
		}