in src/main/java/com/amazonaws/gdcreplication/util/SNSUtil.java [89:128]
public int publishDatabaseSchemasToSNS(AmazonSNS sns, List<Database> masterDBList, String snsTopicArn,
DDBUtil ddbUtil, String ddbTblName, String sourceGlueCatalogId) {
long exportRunId = System.currentTimeMillis();
String exportBatchId = Long.toString(exportRunId);
AtomicInteger numberOfDatabasesExported = new AtomicInteger();
// Create Message Attributes
MessageAttributeValue sourceCatalogIdMA = createStringAttribute(sourceGlueCatalogId);
MessageAttributeValue msgTypeMA = createStringAttribute("database");
MessageAttributeValue exportBatchIdMA = createStringAttribute(exportBatchId);
// Convert databases to JSON Messages and publish them to SNS Topic
for (Database db : masterDBList) {
// Convert Glue Database to JSON String
Gson gson = new Gson();
String databaseDDL = gson.toJson(db);
// Publish JSON String to Amazon SNS topic
PublishRequest publishRequest = new PublishRequest(snsTopicArn, databaseDDL);
Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>();
messageAttributes.put("source_catalog_id", sourceCatalogIdMA);
messageAttributes.put("message_type", msgTypeMA);
messageAttributes.put("export_batch_id", exportBatchIdMA);
publishRequest.setMessageAttributes(messageAttributes);
try {
PublishResult publishResponse = sns.publish(publishRequest);
numberOfDatabasesExported.getAndIncrement();
System.out.printf("Schema for Database '%s' published to SNS Topic. Message_Id: %s. \n",
db.getName(), publishResponse.getMessageId());
ddbUtil.trackDatabaseExportStatus(ddbTblName, db.getName(), databaseDDL, publishResponse.getMessageId(),
sourceGlueCatalogId, exportRunId, exportBatchId, true);
} catch (Exception e) {
e.printStackTrace();
System.out.printf(
"Schema for Database '%s' could not be published to SNS Topic. It will be audited in DynamoDB table. \n",
db.getName());
ddbUtil.trackDatabaseExportStatus(ddbTblName, db.getName(), databaseDDL, "", sourceGlueCatalogId,
exportRunId, exportBatchId, false);
}
}
System.out.println("Number of databases exported to SNS: " + numberOfDatabasesExported.get());
return numberOfDatabasesExported.get();
}