public static void cleanupAggTable()

in src/main/java/com/amazonaws/services/kinesis/aggregators/datastore/DynamoUtils.java [254:318]


	public static void cleanupAggTable(AWSCredentialsProvider credentials,
			Region region, final String dynamoTable, final String toSeq)
			throws Exception {
		final Double deleteBelow = Double.parseDouble(toSeq);

		// create two clients - one synchronous for the read of all candidate
		// values, and another for the delete operations
		final AmazonDynamoDB dynamoClient = new AmazonDynamoDBClient(
				credentials);
		if (region != null)
			dynamoClient.setRegion(region);
		final AmazonDynamoDBAsyncClient deleteCli = new AmazonDynamoDBAsyncClient(
				credentials);
		deleteCli.setRegion(region);
		Map<String, AttributeValue> lastKey = null;
		Map<String, AttributeValue> deleteKey = null;

		// work out what the key and date column name is
		String keyColumn = null;
		String dateColumn = null;

		List<KeySchemaElement> keySchema = dynamoClient
				.describeTable(dynamoTable).getTable().getKeySchema();
		for (KeySchemaElement element : keySchema) {
			if (element.getKeyType().equals(KeyType.HASH.name()))
				keyColumn = element.getAttributeName();

			if (element.getKeyType().equals(KeyType.RANGE.name()))
				dateColumn = element.getAttributeName();
		}

		LOG.info(String.format(
				"Deleting data from %s where %s values are below %s",
				dynamoTable, StreamAggregator.LAST_WRITE_SEQ, deleteBelow));
		int deleteCount = 0;

		do {
			// read data from the table
			ScanRequest scan = new ScanRequest()
					.withTableName(dynamoTable)
					.withAttributesToGet(keyColumn, dateColumn,
							StreamAggregator.LAST_WRITE_SEQ)
					.withExclusiveStartKey(lastKey);

			ScanResult results = dynamoClient.scan(scan);

			// delete everything up to the system provided change number
			for (Map<String, AttributeValue> map : results.getItems()) {
				deleteKey = new HashMap<>();
				deleteKey.put(keyColumn, map.get(keyColumn));
				deleteKey.put(dateColumn, map.get(dateColumn));

				if (Double.parseDouble(map.get(StreamAggregator.LAST_WRITE_SEQ)
						.getS()) < deleteBelow) {
					deleteCli.deleteItem(dynamoTable, deleteKey);
					deleteCount++;
				}
			}
			lastKey = results.getLastEvaluatedKey();
		} while (lastKey != null);

		LOG.info(String.format(
				"Operation Complete - %s Records removed from Aggregate Store",
				deleteCount));
	}