in src/main/java/software/aws/glue/tableversions/lambda/TableVersionsCleanupLambda.java [83:143]
public void processEvent(AWSGlue glueClient, AmazonDynamoDB dynamoDBClient, List<SQSMessage> sqsMessages,
int numberofVersionsToRetain, String dynamoDBTableName, String hashKey, String rangeKey) {
DDBUtil ddbUtil = new DDBUtil();
GlueUtil glueUtil = new GlueUtil();
List<TableVersionStatus> tblVersionsNotDeletedMasterList = new ArrayList<TableVersionStatus>();
for (SQSMessage sqsMessage : sqsMessages) {
long executionId = System.currentTimeMillis();
// get Execution Batch Id from Message Attributes
String executionBatchId = "";
for (Entry<String, MessageAttribute> entry : sqsMessage.getMessageAttributes().entrySet()) {
if ("ExecutionBatchId".equalsIgnoreCase(entry.getKey())) {
executionBatchId = entry.getValue().getStringValue();
System.out.println("Execution Batch Id: " + executionBatchId);
}
}
// de-serialize SQS message to GlueTable
Gson gson = new Gson();
String message = new String(sqsMessage.getBody());
GlueTable glueTable = gson.fromJson(message, GlueTable.class);
System.out.printf("Process event for table '%s' under database '%s' \n", glueTable.getTableName(),
glueTable.getDatabaseName());
// get table versions
List<TableVersion> tableVersionList = glueUtil.getTableVersions(glueClient, glueTable.getTableName(),
glueTable.getDatabaseName());
if (tableVersionList.size() > numberofVersionsToRetain) {
// identify the versions that are older than numberofVersionsToRetain
List<List<Integer>> lists = glueUtil.determineOldVersions(tableVersionList, glueTable.getTableName(),
glueTable.getDatabaseName(), numberofVersionsToRetain);
List<Integer> versionsToKeep = lists.get(0);
List<Integer> versionsToDelete = lists.get(1);
System.out.printf("For table '%s', versions to be deleted: %d, versions to be retaind: %d \n",
glueTable.getTableName(), versionsToDelete.size(), versionsToKeep.size());
// delete older versions
List<TableVersionStatus> tblVersionsNotDeletedList = glueUtil.deleteTableVersions(glueClient,
versionsToDelete, glueTable.getTableName(), glueTable.getDatabaseName());
int numTableVersionsB4Cleanup = tableVersionList.size();
int numDeletedVersions = versionsToDelete.size() - tblVersionsNotDeletedList.size();
boolean itemInserted = ddbUtil.insertCleanupStatusToDynamoDB(dynamoDBClient, dynamoDBTableName, hashKey,
rangeKey, executionId, executionBatchId, glueTable.getDatabaseName(), glueTable.getTableName(),
numTableVersionsB4Cleanup, versionsToKeep.size(), numDeletedVersions);
if (tblVersionsNotDeletedList.size() == 0)
System.out.printf("Older versions of table '%s' under database '%s' were deleted. \n",
glueTable.getTableName(), glueTable.getDatabaseName());
else
tblVersionsNotDeletedMasterList.addAll(tblVersionsNotDeletedList);
} else {
System.out.printf("Table '%s' does not have more than %d versions. Skipping. \n",
glueTable.getTableName(), numberofVersionsToRetain);
}
}
}