in src/main/java/com/amazonaws/gdcreplication/lambda/ExportDatabaseWithTables.java [120:233]
public static void processSNSEvent(List<SNSRecord> snsRecods, Context context, AWSGlue glue, AmazonSNS sns,
AmazonSQS sqs, String sourceGlueCatalogId, String ddbTblNameForDBStatusTracking,
String ddbTblNameForTableStatusTracking, String topicArn, String sqsQueue4LargePartTables,
int partitionThreshold) {
Database db = null;
Gson gson = new Gson();
DDBUtil ddbUtil = new DDBUtil();
SNSUtil snsUtil = new SNSUtil();
GlueUtil glueUtil = new GlueUtil();
SQSUtil sqsUtil = new SQSUtil();
long exportRunId = System.currentTimeMillis();
for (SNSRecord snsRecod : snsRecods) {
List<WriteRequest> itemList = new ArrayList<WriteRequest>();
boolean isDatabaseType = false;
AtomicInteger numberOfTablesExported = new AtomicInteger();
String databaseDDL = snsRecod.getSNS().getMessage();
context.getLogger().log("SNS Message Payload: " + databaseDDL);
Map<String, MessageAttribute> msgAttributeMap = snsRecod.getSNS().getMessageAttributes();
MessageAttribute msgAttrMessageType = msgAttributeMap.get("message_type");
MessageAttribute msgAttrExportBatchId = msgAttributeMap.get("export_batch_id");
context.getLogger().log("Message Attribute value: " + msgAttrMessageType.getValue());
// Convert Message to Glue Database Type
try {
if (msgAttrMessageType.getValue().equalsIgnoreCase("database")) {
db = gson.fromJson(databaseDDL, Database.class);
isDatabaseType = true;
}
} catch (JsonSyntaxException e) {
System.out.println("Cannot parse SNS message to Glue Database Type.");
e.printStackTrace();
}
if (isDatabaseType) {
// Check if a database exist in Glue
Database database = glueUtil.getDatabaseIfExist(glue, sourceGlueCatalogId, db);
if (Optional.ofNullable(database).isPresent()) {
PublishResult publishDBResponse = snsUtil.publishDatabaseSchemaToSNS(sns, topicArn, databaseDDL,
sourceGlueCatalogId, msgAttrExportBatchId.getValue());
if (Optional.ofNullable(publishDBResponse.getMessageId()).isPresent()) {
System.out.println("Database schema published to SNS Topic. Message_Id: "
+ publishDBResponse.getMessageId());
ddbUtil.trackDatabaseExportStatus(ddbTblNameForDBStatusTracking, db.getName(), databaseDDL,
publishDBResponse.getMessageId(), sourceGlueCatalogId, exportRunId, msgAttrExportBatchId.getValue(), true);
} else {
ddbUtil.trackDatabaseExportStatus(ddbTblNameForDBStatusTracking, db.getName(), databaseDDL, "",
sourceGlueCatalogId, exportRunId, msgAttrExportBatchId.getValue(), false);
}
// Get Tables for a given Database
List<Table> dbTableList = glueUtil.getTables(glue, sourceGlueCatalogId, database.getName());
for (Table table : dbTableList) {
List<Partition> partitionList = glueUtil.getPartitions(glue, sourceGlueCatalogId, table.getDatabaseName(), table.getName());
if(partitionList.size() <= partitionThreshold) {
System.out.printf("Database: %s, Table: %s, num_partitions: %d \n", table.getDatabaseName(), table.getName(), partitionList.size());
TableWithPartitions tableWithParts = new TableWithPartitions();
tableWithParts.setPartitionList(partitionList);
tableWithParts.setTable(table);
// Convert Table to JSON String
String tableDDL = gson.toJson(tableWithParts);
// Publish a message to Amazon SNS topic.
PublishResult publishTableResponse = snsUtil.publishTableSchemaToSNS(sns, topicArn, table, tableDDL,
sourceGlueCatalogId, msgAttrExportBatchId.getValue());
Map<String, AttributeValue> item = new HashMap<String, AttributeValue>();
item.put("table_id", new AttributeValue().withS(table.getName().concat("|").concat(table.getDatabaseName())));
item.put("export_run_id", new AttributeValue().withN(Long.valueOf(exportRunId).toString()));
item.put("export_batch_id", new AttributeValue().withS(msgAttrExportBatchId.getValue()));
item.put("source_glue_catalog_id", new AttributeValue().withS(sourceGlueCatalogId));
item.put("table_schema", new AttributeValue().withS(tableDDL));
item.put("is_large_table", new AttributeValue().withS(Boolean.toString(false)));
if (Optional.ofNullable(publishTableResponse.getMessageId()).isPresent()) {
item.put("sns_msg_id", new AttributeValue().withS(publishTableResponse.getMessageId()));
item.put("is_exported", new AttributeValue().withS(Boolean.toString(true)));
numberOfTablesExported.getAndIncrement();
} else {
item.put("sns_msg_id", new AttributeValue().withS(""));
item.put("is_exported", new AttributeValue().withS(Boolean.toString(false)));
}
itemList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item)));
} else {
LargeTable largeTable = new LargeTable();
largeTable.setTable(table);
largeTable.setLargeTable(true);
largeTable.setNumberOfPartitions(partitionList.size());
largeTable.setCatalogId(sourceGlueCatalogId);
System.out.printf("Database: %s, Table: %s, num_partitions: %d \n", table.getDatabaseName(), table.getName(), partitionList.size());
System.out.println("This will be sent to SQS Queue for furhter processing.");
sqsUtil.sendTableSchemaToSQSQueue(sqs, sqsQueue4LargePartTables, largeTable, msgAttrExportBatchId.getValue(), sourceGlueCatalogId);
}
}
System.out.printf("Inserting Table statistics to DynamoDB for database: %s \n", database.getName());
ddbUtil.insertIntoDynamoDB(itemList, ddbTblNameForTableStatusTracking);
System.out.printf(
"Table export statistics: number of tables exist in Database = %d, number of tables exported to SNS = %d. \n",
dbTableList.size(), numberOfTablesExported.get());
} else
System.out.printf(
"There is no Database with name '%s' exist in Glue Data Catalog. Tables cannot be retrieved. \n",
database.getName());
} else {
System.out.println(
"Message received from SNS Topic seems to be invalid. It could not be converted to Glue Database Type.");
}
}
}