in src/main/java/com/amazonaws/gdcreplication/lambda/ImportLargeTable.java [33:84]
public String handleRequest(SQSEvent event, Context context) {
String region = Optional.ofNullable(System.getenv("region")).orElse(Regions.US_EAST_1.getName());
String targetGlueCatalogId = Optional.ofNullable(System.getenv("target_glue_catalog_id")).orElse("1234567890");
boolean skipTableArchive = Boolean
.parseBoolean(Optional.ofNullable(System.getenv("skip_archive")).orElse("true"));
String ddbTblNameForTableStatusTracking = Optional.ofNullable(System.getenv("ddb_name_table_import_status"))
.orElse("ddb_name_table_import_status");
boolean recordProcessed = false;
// Print environment variables
printEnvVariables(targetGlueCatalogId, skipTableArchive, ddbTblNameForTableStatusTracking, region);
// Set client configuration
ClientConfiguration cc = new ClientConfiguration();
cc.setMaxErrorRetry(10);
// Create Objects for Glue and SQS
AWSGlue glue = AWSGlueClientBuilder.standard().withRegion(region).withClientConfiguration(cc).build();
AmazonSQS sqs = AmazonSQSClientBuilder.standard().withRegion(region).withClientConfiguration(cc).build();
// Iterate and process all the messages which are part of SQSEvent
System.out.println("Number of messages in SQS Event: " + event.getRecords().size());
for (SQSMessage msg : event.getRecords()) {
String ddl = new String(msg.getBody());
String exportBatchId = "";
String schemaType = "";
String sourceGlueCatalogId = "";
// Read Message Attributes
for (Entry<String, MessageAttribute> entry : msg.getMessageAttributes().entrySet()) {
if ("ExportBatchId".equalsIgnoreCase(entry.getKey())) {
exportBatchId = entry.getValue().getStringValue();
System.out.println("Export Batch Id: " + exportBatchId);
} else if ("SourceGlueDataCatalogId".equalsIgnoreCase(entry.getKey())) {
sourceGlueCatalogId = entry.getValue().getStringValue();
System.out.println("Source Glue Data Cagalog Id: " + sourceGlueCatalogId);
} else if ("SchemaType".equalsIgnoreCase(entry.getKey())) {
schemaType = entry.getValue().getStringValue();
System.out.println("Message Schema Type " + schemaType);
}
}
if (schemaType.equalsIgnoreCase("largeTable")) {
recordProcessed = processsRecord(context, glue, sqs, targetGlueCatalogId, ddbTblNameForTableStatusTracking,
ddl, skipTableArchive, exportBatchId, sourceGlueCatalogId, region);
}
if (!recordProcessed) {
System.out.printf("Input message '%s' could not be processed. This is an exception. It will be reprocessed again. \n", ddl);
throw new RuntimeException();
}
}
return "Success";
}