in src/main/java/software/aws/glue/tableversions/lambda/TableVersionsCleanupPlannerLambda.java [49:120]
public String handleRequest(Object input, Context context) {
String separator = Optional.ofNullable(System.getenv("separator")).orElse("$");
String region = Optional.ofNullable(System.getenv("region")).orElse(Regions.US_EAST_1.getName());
String databaseNamesStringLiteral = Optional.ofNullable(System.getenv("database_names_string_literal"))
.orElse("database_1$database_2");
String sqsQueueURI = Optional.ofNullable(System.getenv("sqs_queue_url"))
.orElse("https://sqs.us-east-1.amazonaws.com/1234567890/table_versions_cleanup_planner_queue.fifo");
String ddbTableName = Optional.ofNullable(System.getenv("ddb_table_name"))
.orElse("glue_table_version_cleanup_planner");
String hashKey = Optional.ofNullable(System.getenv("hash_key")).orElse("execution_batch_id");
String rangeKey = Optional.ofNullable(System.getenv("range_key")).orElse("database_name_table_name");
long executionBatchId = System.currentTimeMillis();
AWSSecurityTokenService client = AWSSecurityTokenServiceClientBuilder.standard().build();
GetCallerIdentityRequest request = new GetCallerIdentityRequest();
GetCallerIdentityResult response = client.getCallerIdentity(request);
String homeCatalogId = response.getAccount();
context.getLogger().log("Catalog Id: " + homeCatalogId);
context.getLogger().log("Input: " + input);
printEnvVariables(sqsQueueURI, databaseNamesStringLiteral, separator, region, ddbTableName, hashKey, rangeKey);
// Create objects for AWS Glue and Amazon SQS
AWSGlue glue = AWSGlueClientBuilder.standard().withRegion(region).build();
AmazonSQS sqs = AmazonSQSClientBuilder.standard().withRegion(region).build();
AmazonDynamoDB dynamoDBClient = AmazonDynamoDBClientBuilder.standard().withRegion(region).build();
DDBUtil ddbUtil = new DDBUtil();
SQSUtil sqsUtil = new SQSUtil();
GlueUtil glueUtil = new GlueUtil();
List<String> databaseNames = new ArrayList<String>();
List<Database> databaseList = new ArrayList<Database>();
AtomicInteger numberOfTablesExported = new AtomicInteger();
// When list of databases are provided as a token separated values then the
// cleanup process will be initiated for those databases.
// else, it imports the cleanup process will be initiated for all databases
if (databaseNamesStringLiteral.equalsIgnoreCase("")) {
databaseList = glueUtil.getDatabases(glue, homeCatalogId);
} else {
databaseNames = tokenizeStrings(databaseNamesStringLiteral, separator);
for (String databaseName : databaseNames) {
Database database = glueUtil.getDatabase(glue, homeCatalogId, databaseName);
if (Optional.ofNullable(database).isPresent())
databaseList.add(database);
}
}
List<Table> tableList = glueUtil.getTables(glue, databaseList, homeCatalogId);
for (Table table : tableList) {
GlueTable tableMessage = new GlueTable();
tableMessage.setDatabaseName(table.getDatabaseName());
tableMessage.setTableName(table.getName());
Gson gson = new Gson();
String message = gson.toJson(tableMessage);
// Write a message to Amazon SQS queue.
boolean messageSentToSQS = sqsUtil.sendTableSchemaToSQSQueue(sqs, sqsQueueURI, message, executionBatchId, table.getDatabaseName());
if (messageSentToSQS) {
String messageSentTime = new Date().toString();
numberOfTablesExported.incrementAndGet();
ddbUtil.insertTableDetailsToDynamoDB(dynamoDBClient, ddbTableName, hashKey, rangeKey, executionBatchId,
table.getDatabaseName(), table.getName(), messageSentTime);
}
}
System.out.printf("Number of messages written to SQS Queue: %d \n", numberOfTablesExported.get());
return "TableVersionsCleanupPlannerLambda completed successfully!";
}