in src/main/java/com/amazonaws/services/kinesis/aggregators/datastore/DynamoUtils.java [254:318]
public static void cleanupAggTable(AWSCredentialsProvider credentials,
Region region, final String dynamoTable, final String toSeq)
throws Exception {
final Double deleteBelow = Double.parseDouble(toSeq);
// create two clients - one synchronous for the read of all candidate
// values, and another for the delete operations
final AmazonDynamoDB dynamoClient = new AmazonDynamoDBClient(
credentials);
if (region != null)
dynamoClient.setRegion(region);
final AmazonDynamoDBAsyncClient deleteCli = new AmazonDynamoDBAsyncClient(
credentials);
deleteCli.setRegion(region);
Map<String, AttributeValue> lastKey = null;
Map<String, AttributeValue> deleteKey = null;
// work out what the key and date column name is
String keyColumn = null;
String dateColumn = null;
List<KeySchemaElement> keySchema = dynamoClient
.describeTable(dynamoTable).getTable().getKeySchema();
for (KeySchemaElement element : keySchema) {
if (element.getKeyType().equals(KeyType.HASH.name()))
keyColumn = element.getAttributeName();
if (element.getKeyType().equals(KeyType.RANGE.name()))
dateColumn = element.getAttributeName();
}
LOG.info(String.format(
"Deleting data from %s where %s values are below %s",
dynamoTable, StreamAggregator.LAST_WRITE_SEQ, deleteBelow));
int deleteCount = 0;
do {
// read data from the table
ScanRequest scan = new ScanRequest()
.withTableName(dynamoTable)
.withAttributesToGet(keyColumn, dateColumn,
StreamAggregator.LAST_WRITE_SEQ)
.withExclusiveStartKey(lastKey);
ScanResult results = dynamoClient.scan(scan);
// delete everything up to the system provided change number
for (Map<String, AttributeValue> map : results.getItems()) {
deleteKey = new HashMap<>();
deleteKey.put(keyColumn, map.get(keyColumn));
deleteKey.put(dateColumn, map.get(dateColumn));
if (Double.parseDouble(map.get(StreamAggregator.LAST_WRITE_SEQ)
.getS()) < deleteBelow) {
deleteCli.deleteItem(dynamoTable, deleteKey);
deleteCount++;
}
}
lastKey = results.getLastEvaluatedKey();
} while (lastKey != null);
LOG.info(String.format(
"Operation Complete - %s Records removed from Aggregate Store",
deleteCount));
}