in src/main/java/com/amazonaws/services/kinesis/aggregators/datastore/DynamoQueryEngine.java [226:381]
public void run() {
ScanRequest scanRequest = new ScanRequest()
.withTableName(this.tableName)
.withAttributesToGet(this.hashKey)
.withSegment(this.workerInstance)
.withTotalSegments(threads);
Map<String, Set<String>> deduplicated = new HashMap<>();
Set<String> rangeValues = null;
Map<String, AttributeValue> lastKeyEvaluated = null;
int scanAttempts = 0;
int limit = -1;
boolean returnedResults = false;
String lastLabel = null;
int uniqueLabels = 0;
do {
ScanResult result = null;
// set query limits, to optimise for skip scan or for hash/range
// query with no limit
if (this.scope.equals(QueryKeyScope.HashKey)) {
if (uniqueLabels > 0 && uniqueLabels == resultsProcessed) {
// remove the query limit if every row being returned is
// unique
limit = -1;
} else {
// set a limit of twice the number of uniques, so we can
// get a larger result set as we go
if (uniqueLabels == 0) {
limit = 100;
} else {
limit = uniqueLabels * 2;
}
// reset the unique labels so it doesn't grow without
// limit
uniqueLabels = 0;
}
} else {
scanRequest.withAttributesToGet(this.rangeKey);
}
do {
try {
// set the limit if we have one
if (limit != -1) {
scanRequest.withLimit(limit);
}
result = dynamoClient.scan(scanRequest
.withExclusiveStartKey(lastKeyEvaluated));
if (result.getItems().size() > 0) {
returnedResults = true;
} else {
returnedResults = false;
}
} catch (ProvisionedThroughputExceededException e) {
LOG.warn(String
.format("Provisioned Throughput Exceeded - Retry Attempt %s",
scanAttempts));
// back off
try {
Thread.sleep(2 ^ scanAttempts * BACKOFF_MILLIS);
} catch (InterruptedException interrupted) {
this.exception = interrupted;
return;
}
scanAttempts++;
}
} while (scanAttempts < 10 && result == null);
if (result == null) {
this.exception = new Exception(String.format(
"Unable to execute Scan after %s attempts",
scanAttempts));
return;
}
// process the results, creating a deduplicated map/set of
// hash/range keys
String labelValue = null;
if (returnedResults) {
for (Map<String, AttributeValue> map : result.getItems()) {
resultsProcessed++;
labelValue = map.get(this.hashKey).getS();
// only enter the label value into the hash once
if (scope.equals(QueryKeyScope.HashKey)) {
if (!labelValue.equals(lastLabel)
|| lastLabel == null) {
deduplicated.put(labelValue, null);
lastLabel = labelValue;
uniqueLabels++;
}
} else {
if (deduplicated.containsKey(labelValue)) {
rangeValues = deduplicated.get(labelValue);
} else {
rangeValues = new HashSet<String>();
}
rangeValues.add(map.get(this.rangeKey).getS());
deduplicated.put(labelValue, rangeValues);
}
}
// set the last evaluated key. if we have processed a bunch
// of data and are not at the end of the result set, then
// we'll force a skip forward on date, to eliminate
// continued processing of high cardinality hash values
if (this.scope.equals(QueryKeyScope.HashKey)
&& result.getLastEvaluatedKey() != null) {
// skip scan
lastKeyEvaluated = new HashMap<>();
lastKeyEvaluated.put(this.hashKey,
new AttributeValue().withS(labelValue));
lastKeyEvaluated.put(this.rangeKey,
new AttributeValue()
.withS("4000-01-01 00:00:00"));
} else {
lastKeyEvaluated = result.getLastEvaluatedKey();
}
} else {
lastKeyEvaluated = null;
}
} while (lastKeyEvaluated != null);
if (this.scope.equals(QueryKeyScope.HashKey)) {
LOG.debug(String.format("Worker %s extracted %s results",
this.workerInstance, deduplicated.size()));
} else {
LOG.debug(String
.format("Worker %s deduplicated %s results, creating distinct set of %s keys",
this.workerInstance, resultsProcessed,
deduplicated.size()));
}
this.output = new ArrayList<>();
if (deduplicated.size() > 0) {
for (String s : deduplicated.keySet()) {
TableKeyStructure t = new TableKeyStructure(this.hashKey,
s, this.rangeKey);
if (scope.equals(QueryKeyScope.HashAndRangeKey)) {
for (String rangeValue : deduplicated.get(s)) {
t.withDateValue(rangeValue);
}
}
output.add(t);
}
}
}