in src/main/java/com/amazonaws/services/kinesis/aggregators/datastore/DynamoQueryEngine.java [417:503]
public void run() {
List<Map<String, AttributeValue>> results = new ArrayList<>();
for (int i = this.start; i < this.start + this.range; i++) {
Condition c = new Condition().withComparisonOperator(
ComparisonOperator.EQ).withAttributeValueList(
new AttributeValue().withN("" + i));
this.conditions
.put(DynamoDataStore.SCATTER_PREFIX_ATTRIBUTE, c);
QueryRequest req = new QueryRequest()
.withIndexName(this.indexName)
.withTableName(this.tableName)
.withKeyConditions(this.conditions);
Map<String, AttributeValue> lastKeyEvaluated = null;
do {
int queryAttempts = 0;
QueryResult result = null;
do {
try {
result = dynamoClient.query(req)
.withLastEvaluatedKey(lastKeyEvaluated);
results.addAll(result.getItems());
} catch (ProvisionedThroughputExceededException e) {
LOG.warn(String
.format("Provisioned Throughput Exceeded - Retry Attempt %s",
queryAttempts));
try {
Thread.sleep(2 ^ queryAttempts * BACKOFF_MILLIS);
} catch (InterruptedException interrupted) {
this.exception = interrupted;
return;
}
queryAttempts++;
}
} while (queryAttempts < 10 && result == null);
if (result == null) {
this.exception = new Exception(String.format(
"Unable to execute Query after %s attempts",
queryAttempts));
return;
}
lastKeyEvaluated = result.getLastEvaluatedKey();
} while (lastKeyEvaluated != null);
// pivot the results into a list of label values and set of date
// values
String labelValue = null;
String dateValue = null;
Set<String> values;
for (Map<String, AttributeValue> map : results) {
// process each attribute
for (String s : map.keySet()) {
// grab the label and date values
if (s.equals(this.labelAttribute)) {
labelValue = map.get(s).getS();
} else if (s.equals(this.dateAttribute)) {
dateValue = map.get(s).getS();
}
}
if (labelValue != null && dateValue != null) {
// get the current set of date values for the label, or
// create a new one
if (!resultKeys.containsKey(labelValue)) {
values = new HashSet<>();
} else {
values = resultKeys.get(labelValue);
}
// add the current date value to the set of all date
// values
// fore label
values.add(dateValue);
// write back the map of label to date values
resultKeys.put(labelValue, values);
}
}
}
}