public void run()

in src/main/java/com/amazonaws/services/kinesis/aggregators/datastore/DynamoQueryEngine.java [226:381]


		public void run() {
			ScanRequest scanRequest = new ScanRequest()
					.withTableName(this.tableName)
					.withAttributesToGet(this.hashKey)
					.withSegment(this.workerInstance)
					.withTotalSegments(threads);
			Map<String, Set<String>> deduplicated = new HashMap<>();
			Set<String> rangeValues = null;
			Map<String, AttributeValue> lastKeyEvaluated = null;
			int scanAttempts = 0;
			int limit = -1;
			boolean returnedResults = false;
			String lastLabel = null;
			int uniqueLabels = 0;

			do {
				ScanResult result = null;

				// set query limits, to optimise for skip scan or for hash/range
				// query with no limit
				if (this.scope.equals(QueryKeyScope.HashKey)) {
					if (uniqueLabels > 0 && uniqueLabels == resultsProcessed) {
						// remove the query limit if every row being returned is
						// unique
						limit = -1;
					} else {
						// set a limit of twice the number of uniques, so we can
						// get a larger result set as we go
						if (uniqueLabels == 0) {
							limit = 100;
						} else {
							limit = uniqueLabels * 2;
						}

						// reset the unique labels so it doesn't grow without
						// limit
						uniqueLabels = 0;
					}
				} else {
					scanRequest.withAttributesToGet(this.rangeKey);
				}

				do {
					try {
						// set the limit if we have one
						if (limit != -1) {
							scanRequest.withLimit(limit);
						}
						result = dynamoClient.scan(scanRequest
								.withExclusiveStartKey(lastKeyEvaluated));

						if (result.getItems().size() > 0) {
							returnedResults = true;
						} else {
							returnedResults = false;
						}
					} catch (ProvisionedThroughputExceededException e) {
						LOG.warn(String
								.format("Provisioned Throughput Exceeded - Retry Attempt %s",
										scanAttempts));

						// back off
						try {
							Thread.sleep(2 ^ scanAttempts * BACKOFF_MILLIS);
						} catch (InterruptedException interrupted) {
							this.exception = interrupted;
							return;
						}
						scanAttempts++;
					}
				} while (scanAttempts < 10 && result == null);

				if (result == null) {
					this.exception = new Exception(String.format(
							"Unable to execute Scan after %s attempts",
							scanAttempts));
					return;
				}

				// process the results, creating a deduplicated map/set of
				// hash/range keys
				String labelValue = null;
				if (returnedResults) {
					for (Map<String, AttributeValue> map : result.getItems()) {
						resultsProcessed++;

						labelValue = map.get(this.hashKey).getS();

						// only enter the label value into the hash once
						if (scope.equals(QueryKeyScope.HashKey)) {
							if (!labelValue.equals(lastLabel)
									|| lastLabel == null) {
								deduplicated.put(labelValue, null);
								lastLabel = labelValue;
								uniqueLabels++;
							}
						} else {
							if (deduplicated.containsKey(labelValue)) {
								rangeValues = deduplicated.get(labelValue);
							} else {
								rangeValues = new HashSet<String>();
							}

							rangeValues.add(map.get(this.rangeKey).getS());

							deduplicated.put(labelValue, rangeValues);
						}
					}

					// set the last evaluated key. if we have processed a bunch
					// of data and are not at the end of the result set, then
					// we'll force a skip forward on date, to eliminate
					// continued processing of high cardinality hash values
					if (this.scope.equals(QueryKeyScope.HashKey)
							&& result.getLastEvaluatedKey() != null) {
						// skip scan
						lastKeyEvaluated = new HashMap<>();
						lastKeyEvaluated.put(this.hashKey,
								new AttributeValue().withS(labelValue));
						lastKeyEvaluated.put(this.rangeKey,
								new AttributeValue()
										.withS("4000-01-01 00:00:00"));
					} else {
						lastKeyEvaluated = result.getLastEvaluatedKey();
					}
				} else {
					lastKeyEvaluated = null;
				}
			} while (lastKeyEvaluated != null);

			if (this.scope.equals(QueryKeyScope.HashKey)) {
				LOG.debug(String.format("Worker %s extracted %s results",
						this.workerInstance, deduplicated.size()));
			} else {
				LOG.debug(String
						.format("Worker %s deduplicated %s results, creating distinct set of %s keys",
								this.workerInstance, resultsProcessed,
								deduplicated.size()));
			}

			this.output = new ArrayList<>();
			if (deduplicated.size() > 0) {
				for (String s : deduplicated.keySet()) {
					TableKeyStructure t = new TableKeyStructure(this.hashKey,
							s, this.rangeKey);

					if (scope.equals(QueryKeyScope.HashAndRangeKey)) {
						for (String rangeValue : deduplicated.get(s)) {
							t.withDateValue(rangeValue);
						}
					}

					output.add(t);
				}
			}
		}