in src/main/java/com/amazonaws/gdcreplication/lambda/ExportLargeTable.java [37:143]
public String handleRequest(SQSEvent event, Context context) {
String region = Optional.ofNullable(System.getenv("region")).orElse(Regions.US_EAST_1.getName());
String topicArn = Optional.ofNullable(System.getenv("sns_topic_arn_export_dbs_tables"))
.orElse("arn:aws:sns:us-east-1:1234567890:GlueExportSNSTopic");
String bucketName = Optional.ofNullable(System.getenv("s3_bucket_name")).orElse("");
String ddbTblNameForTableStatusTracking = Optional.ofNullable(System.getenv("ddb_name_table_export_status"))
.orElse("ddb_name_table_export_status");
// Set client configuration
ClientConfiguration cc = new ClientConfiguration();
cc.setMaxErrorRetry(10);
// Create Objects for Glue and SQS
AWSGlue glue = AWSGlueClientBuilder.standard().withRegion(region).withClientConfiguration(cc).build();
AmazonSNS sns = AmazonSNSClientBuilder.standard().withRegion(region).build();
// // Create Objects for Utility classes
DDBUtil ddbUtil = new DDBUtil();
GlueUtil glueUtil = new GlueUtil();
S3Util s3Util = new S3Util();
SNSUtil snsUtil = new SNSUtil();
String objectKey = "";
LargeTable largeTable = null;
boolean recordProcessed = false;
boolean objectCreated = false;
/**
* Iterate and process all the messages which are part of SQSEvent
*/
System.out.println("Number of messages in SQS Event: " + event.getRecords().size());
for (SQSMessage msg : event.getRecords()) {
String payLoad = new String(msg.getBody());
String exportBatchId = "";
String sourceGlueCatalogId = "";
String messageType = "";
Gson gson = new Gson();
long exportRunId = System.currentTimeMillis();
// Read Message Attributes
for (Entry<String, MessageAttribute> entry : msg.getMessageAttributes().entrySet()) {
if ("ExportBatchId".equalsIgnoreCase(entry.getKey())) {
exportBatchId = entry.getValue().getStringValue();
System.out.println("Export Batch Id: " + exportBatchId);
} else if ("SourceGlueDataCatalogId".equalsIgnoreCase(entry.getKey())) {
sourceGlueCatalogId = entry.getValue().getStringValue();
System.out.println("Source Glue Data Cagalog Id: " + sourceGlueCatalogId);
} else if ("SchemaType".equalsIgnoreCase(entry.getKey())) {
messageType = entry.getValue().getStringValue();
System.out.println("Message Type " + messageType);
}
}
if (messageType.equalsIgnoreCase("largeTable")) {
largeTable = gson.fromJson(payLoad, LargeTable.class);
if (largeTable.isLargeTable()) {
// Create object key
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
StringBuilder date = new StringBuilder(simpleDateFormat.format(new Date()));
objectKey = date.append("_").append(Long.toString(System.currentTimeMillis())).append("_")
.append(sourceGlueCatalogId).append("_").append(largeTable.getTable().getDatabaseName())
.append("_").append(largeTable.getTable().getName()).append(".txt").toString();
String content = getPartitionsAndCreateObjectContent(context, glue, glueUtil, sourceGlueCatalogId, largeTable, exportBatchId);
objectCreated = s3Util.createS3Object(region, bucketName, objectKey, content);
}
PublishResult publishResponse = null;
String largeTableJSON = "";
// Send S3 object key to SNS Topic
if (objectCreated && !objectKey.equalsIgnoreCase("")) {
largeTable.setS3ObjectKey(objectKey);
largeTable.setS3BucketName(bucketName);
largeTableJSON = gson.toJson(largeTable);
System.out.println("Large Table JSON: " + largeTableJSON);
publishResponse = snsUtil.publishLargeTableSchemaToSNS(sns, topicArn, region, bucketName, largeTableJSON,
sourceGlueCatalogId, exportBatchId, messageType);
if(Optional.ofNullable(publishResponse).isPresent()) {
System.out.println("Large Table Schema Published to SNS Topic. Message Id: " + publishResponse.getMessageId());
recordProcessed = true;
}
}
// track status in DDB
if (Optional.ofNullable(publishResponse).isPresent()) {
ddbUtil.trackTableExportStatus(ddbTblNameForTableStatusTracking,
largeTable.getTable().getDatabaseName(), largeTable.getTable().getName(), largeTableJSON,
publishResponse.getMessageId(), sourceGlueCatalogId, exportRunId, exportBatchId, true, true,
bucketName, objectKey);
} else {
ddbUtil.trackTableExportStatus(ddbTblNameForTableStatusTracking,
largeTable.getTable().getDatabaseName(), largeTable.getTable().getName(), largeTableJSON,
publishResponse.getMessageId(), sourceGlueCatalogId, exportRunId, exportBatchId, false, true,
null, null);
}
}
}
if (!recordProcessed) {
System.out.printf(
"Schema for table '%s' of database '%s' could not be exported. This is an exception. It will be retried again. \n",
largeTable.getTable().getName(), largeTable.getTable().getDatabaseName());
throw new RuntimeException();
}
return "Success";
}