public int publishDatabaseSchemasToSNS()

in src/main/java/com/amazonaws/gdcreplication/util/SNSUtil.java [89:128]


	public int publishDatabaseSchemasToSNS(AmazonSNS sns, List<Database> masterDBList, String snsTopicArn,
			DDBUtil ddbUtil, String ddbTblName, String sourceGlueCatalogId) {
		long exportRunId = System.currentTimeMillis();
		String exportBatchId = Long.toString(exportRunId);
		AtomicInteger numberOfDatabasesExported = new AtomicInteger();
		// Create Message Attributes
		MessageAttributeValue sourceCatalogIdMA = createStringAttribute(sourceGlueCatalogId);
		MessageAttributeValue msgTypeMA = createStringAttribute("database");
		MessageAttributeValue exportBatchIdMA = createStringAttribute(exportBatchId);
		// Convert databases to JSON Messages and publish them to SNS Topic
		for (Database db : masterDBList) {
			// Convert Glue Database to JSON String
			Gson gson = new Gson();
			String databaseDDL = gson.toJson(db);
			// Publish JSON String to Amazon SNS topic
			PublishRequest publishRequest = new PublishRequest(snsTopicArn, databaseDDL);
			Map<String, MessageAttributeValue> messageAttributes = new HashMap<String, MessageAttributeValue>();
			messageAttributes.put("source_catalog_id", sourceCatalogIdMA);
			messageAttributes.put("message_type", msgTypeMA);
			messageAttributes.put("export_batch_id", exportBatchIdMA);
			publishRequest.setMessageAttributes(messageAttributes);
			try {
				PublishResult publishResponse = sns.publish(publishRequest);
				numberOfDatabasesExported.getAndIncrement();
				System.out.printf("Schema for Database '%s' published to SNS Topic. Message_Id: %s. \n",
						db.getName(), publishResponse.getMessageId());
				ddbUtil.trackDatabaseExportStatus(ddbTblName, db.getName(), databaseDDL, publishResponse.getMessageId(),
						sourceGlueCatalogId, exportRunId, exportBatchId, true);
			} catch (Exception e) {
				e.printStackTrace();
				System.out.printf(
						"Schema for Database '%s' could not be published to SNS Topic. It will be audited in DynamoDB table. \n",
						db.getName());
				ddbUtil.trackDatabaseExportStatus(ddbTblName, db.getName(), databaseDDL, "", sourceGlueCatalogId,
						exportRunId, exportBatchId, false);
			}
		}
		System.out.println("Number of databases exported to SNS: " + numberOfDatabasesExported.get());
		return numberOfDatabasesExported.get();
	}