in src/main/java/com/amazonaws/gdcreplication/lambda/DLQImportDatabaseOrTable.java [29:87]
public String handleRequest(SQSEvent event, Context context) {
String region = Optional.ofNullable(System.getenv("region")).orElse(Regions.US_EAST_1.getName());
String targetGlueCatalogId = Optional.ofNullable(System.getenv("target_glue_catalog_id")).orElse("1234567890");
boolean skipTableArchive = Boolean
.parseBoolean(Optional.ofNullable(System.getenv("skip_archive")).orElse("true"));
String ddbTblNameForDBStatusTracking = Optional.ofNullable(System.getenv("ddb_name_db_import_status"))
.orElse("ddb_name_db_import_status");
String ddbTblNameForTableStatusTracking = Optional.ofNullable(System.getenv("ddb_name_table_import_status"))
.orElse("ddb_name_table_import_status");
String sqsQueueURL = Optional.ofNullable(System.getenv("dlq_url_sqs")).orElse("");
// Print environment variables
printEnvVariables(targetGlueCatalogId, skipTableArchive, ddbTblNameForDBStatusTracking,
ddbTblNameForTableStatusTracking, sqsQueueURL, region);
// Set client configuration
ClientConfiguration cc = new ClientConfiguration();
cc.setMaxErrorRetry(10);
// Create Objects for Glue and SQS
AWSGlue glue = AWSGlueClientBuilder.standard().withRegion(region).withClientConfiguration(cc).build();
AmazonSQS sqs = AmazonSQSClientBuilder.standard().withRegion(region).withClientConfiguration(cc).build();
/**
* Iterate and process all the messages which are part of SQSEvent
*/
System.out.println("Number of messages in SQS Event: " + event.getRecords().size());
for (SQSMessage msg : event.getRecords()) {
String ddl = new String(msg.getBody());
String exportBatchId = "";
String sourceGlueCatalogId = "";
String schemaType = "";
boolean isTable = false;
// Read Message Attributes
for (Entry<String, MessageAttribute> entry : msg.getMessageAttributes().entrySet()) {
if ("ExportBatchId".equalsIgnoreCase(entry.getKey())) {
exportBatchId = entry.getValue().getStringValue();
System.out.println("Export Batch Id: " + exportBatchId);
} else if ("SourceGlueDataCatalogId".equalsIgnoreCase(entry.getKey())) {
sourceGlueCatalogId = entry.getValue().getStringValue();
System.out.println("Source Glue Data Cagalog Id: " + sourceGlueCatalogId);
} else if ("SchemaType".equalsIgnoreCase(entry.getKey())) {
schemaType = entry.getValue().getStringValue();
System.out.println("Message Schema Type " + schemaType);
}
}
System.out.println("Schema: " + ddl);
if (schemaType.equalsIgnoreCase("Table"))
isTable = true;
processsRecord(context, glue, sqs, sqsQueueURL, targetGlueCatalogId, ddbTblNameForDBStatusTracking,
ddbTblNameForTableStatusTracking, ddl, skipTableArchive, exportBatchId, sourceGlueCatalogId,
isTable);
}
return "Success";
}