in src/main/java/com/amazonaws/gdcreplication/lambda/ImportDatabaseOrTable.java [90:147]
public void processSNSEvent(Context context, List<SNSRecord> snsRecods, AWSGlue glue, AmazonSQS sqs,
String sqsQueueURL, String sqsQueueURLLargeTable, String targetGlueCatalogId,
String ddbTblNameForDBStatusTracking, String ddbTblNameForTableStatusTracking, boolean skipTableArchive,
String region) {
SQSUtil sqsUtil = new SQSUtil();
for (SNSRecord snsRecod : snsRecods) {
boolean isDatabaseType = false;
boolean isTableType = false;
boolean isLargeTable = false;
LargeTable largeTable = null;
Database db = null;
TableWithPartitions table = null;
Gson gson = new Gson();
String message = snsRecod.getSNS().getMessage();
context.getLogger().log("SNS Message Payload: " + message);
// Get message attributes from the SNS Payload
Map<String, MessageAttribute> msgAttributeMap = snsRecod.getSNS().getMessageAttributes();
MessageAttribute msgTypeAttr = msgAttributeMap.get("message_type");
MessageAttribute sourceCatalogIdAttr = msgAttributeMap.get("source_catalog_id");
MessageAttribute exportBatchIdAttr = msgAttributeMap.get("export_batch_id");
String sourceGlueCatalogId = sourceCatalogIdAttr.getValue();
String exportBatchId = exportBatchIdAttr.getValue();
context.getLogger().log("Message Type: " + msgTypeAttr.getValue());
context.getLogger().log("Source Catalog Id: " + sourceGlueCatalogId);
// Serialize JSON String based on the message type
try {
if (msgTypeAttr.getValue().equalsIgnoreCase("database")) {
db = gson.fromJson(message, Database.class);
isDatabaseType = true;
} else if (msgTypeAttr.getValue().equalsIgnoreCase("table")) {
table = gson.fromJson(message, TableWithPartitions.class);
isTableType = true;
} else if (msgTypeAttr.getValue().equalsIgnoreCase("largeTable")) {
largeTable = gson.fromJson(message, LargeTable.class);
isLargeTable = true;
}
} catch (JsonSyntaxException e) {
System.out.println("Cannot parse SNS message to Glue Database Type.");
e.printStackTrace();
}
// Execute the business logic based on the message type
GDCUtil gdcUtil = new GDCUtil();
if (isDatabaseType) {
gdcUtil.processDatabseSchema(glue, sqs, targetGlueCatalogId, db, message, sqsQueueURL, sourceGlueCatalogId,
exportBatchId, ddbTblNameForDBStatusTracking);
} else if (isTableType) {
gdcUtil.processTableSchema(glue, sqs, targetGlueCatalogId, sourceGlueCatalogId, table, message,
ddbTblNameForTableStatusTracking, sqsQueueURL, exportBatchId, skipTableArchive);
} else if (isLargeTable) {
sqsUtil.sendLargeTableSchemaToSQS(sqs, sqsQueueURLLargeTable, exportBatchId, sourceGlueCatalogId,
message, largeTable);
}
}
}