public void processEvent()

in src/main/java/software/aws/glue/tableversions/lambda/TableVersionsCleanupLambda.java [83:143]


	public void processEvent(AWSGlue glueClient, AmazonDynamoDB dynamoDBClient, List<SQSMessage> sqsMessages,
			int numberofVersionsToRetain, String dynamoDBTableName, String hashKey, String rangeKey) {

		DDBUtil ddbUtil = new DDBUtil();
		GlueUtil glueUtil = new GlueUtil();
		List<TableVersionStatus> tblVersionsNotDeletedMasterList = new ArrayList<TableVersionStatus>();

		for (SQSMessage sqsMessage : sqsMessages) {
			long executionId = System.currentTimeMillis();
			// get Execution Batch Id from Message Attributes
			String executionBatchId = "";
			for (Entry<String, MessageAttribute> entry : sqsMessage.getMessageAttributes().entrySet()) {
				if ("ExecutionBatchId".equalsIgnoreCase(entry.getKey())) {
					executionBatchId = entry.getValue().getStringValue();
					System.out.println("Execution Batch Id: " + executionBatchId);
				}
			}

			// de-serialize SQS message to GlueTable
			Gson gson = new Gson();
			String message = new String(sqsMessage.getBody());
			GlueTable glueTable = gson.fromJson(message, GlueTable.class);
			System.out.printf("Process event for table '%s' under database '%s' \n", glueTable.getTableName(),
					glueTable.getDatabaseName());

			// get table versions
			List<TableVersion> tableVersionList = glueUtil.getTableVersions(glueClient, glueTable.getTableName(),
					glueTable.getDatabaseName());

			if (tableVersionList.size() > numberofVersionsToRetain) {
				// identify the versions that are older than numberofVersionsToRetain
				List<List<Integer>> lists = glueUtil.determineOldVersions(tableVersionList, glueTable.getTableName(),
						glueTable.getDatabaseName(), numberofVersionsToRetain);
				List<Integer> versionsToKeep = lists.get(0);
				List<Integer> versionsToDelete = lists.get(1);

				System.out.printf("For table '%s', versions to be deleted: %d, versions to be retaind: %d \n",
						glueTable.getTableName(), versionsToDelete.size(), versionsToKeep.size());

				// delete older versions
				List<TableVersionStatus> tblVersionsNotDeletedList = glueUtil.deleteTableVersions(glueClient,
						versionsToDelete, glueTable.getTableName(), glueTable.getDatabaseName());

				int numTableVersionsB4Cleanup = tableVersionList.size();
				int numDeletedVersions = versionsToDelete.size() - tblVersionsNotDeletedList.size();

				boolean itemInserted = ddbUtil.insertCleanupStatusToDynamoDB(dynamoDBClient, dynamoDBTableName, hashKey,
						rangeKey, executionId, executionBatchId, glueTable.getDatabaseName(), glueTable.getTableName(),
						numTableVersionsB4Cleanup, versionsToKeep.size(), numDeletedVersions);

				if (tblVersionsNotDeletedList.size() == 0)
					System.out.printf("Older versions of table '%s' under database '%s' were deleted. \n",
							glueTable.getTableName(), glueTable.getDatabaseName());
				else
					tblVersionsNotDeletedMasterList.addAll(tblVersionsNotDeletedList);
			} else {
				System.out.printf("Table '%s' does not have more than %d versions. Skipping. \n",
						glueTable.getTableName(), numberofVersionsToRetain);
			}
		}
	}