public void processSNSEvent()

in src/main/java/com/amazonaws/gdcreplication/lambda/ImportDatabaseOrTable.java [90:147]


	public void processSNSEvent(Context context, List<SNSRecord> snsRecods, AWSGlue glue, AmazonSQS sqs,
			String sqsQueueURL, String sqsQueueURLLargeTable, String targetGlueCatalogId,
			String ddbTblNameForDBStatusTracking, String ddbTblNameForTableStatusTracking, boolean skipTableArchive,
			String region) {

		SQSUtil sqsUtil = new SQSUtil();
		for (SNSRecord snsRecod : snsRecods) {
			boolean isDatabaseType = false;
			boolean isTableType = false;
			boolean isLargeTable = false;
			LargeTable largeTable = null;
			Database db = null;
			TableWithPartitions table = null;
			Gson gson = new Gson();
			String message = snsRecod.getSNS().getMessage();
			context.getLogger().log("SNS Message Payload: " + message);
			
			// Get message attributes from the SNS Payload
			Map<String, MessageAttribute> msgAttributeMap = snsRecod.getSNS().getMessageAttributes();
			MessageAttribute msgTypeAttr = msgAttributeMap.get("message_type");
			MessageAttribute sourceCatalogIdAttr = msgAttributeMap.get("source_catalog_id");
			MessageAttribute exportBatchIdAttr = msgAttributeMap.get("export_batch_id");
			String sourceGlueCatalogId = sourceCatalogIdAttr.getValue();
			String exportBatchId = exportBatchIdAttr.getValue();
			context.getLogger().log("Message Type: " + msgTypeAttr.getValue());
			context.getLogger().log("Source Catalog Id: " + sourceGlueCatalogId);
			
			// Serialize JSON String based on the message type
			try {
				if (msgTypeAttr.getValue().equalsIgnoreCase("database")) {
					db = gson.fromJson(message, Database.class);
					isDatabaseType = true;
				} else if (msgTypeAttr.getValue().equalsIgnoreCase("table")) {
					table = gson.fromJson(message, TableWithPartitions.class);
					isTableType = true;
				} else if (msgTypeAttr.getValue().equalsIgnoreCase("largeTable")) {
					largeTable = gson.fromJson(message, LargeTable.class);
					isLargeTable = true;
				}
			} catch (JsonSyntaxException e) {
				System.out.println("Cannot parse SNS message to Glue Database Type.");
				e.printStackTrace();
			}
			
			// Execute the business logic based on the message type
			GDCUtil gdcUtil = new GDCUtil();
			if (isDatabaseType) {
				gdcUtil.processDatabseSchema(glue, sqs, targetGlueCatalogId, db, message, sqsQueueURL, sourceGlueCatalogId,
						exportBatchId, ddbTblNameForDBStatusTracking);
			} else if (isTableType) {
				gdcUtil.processTableSchema(glue, sqs, targetGlueCatalogId, sourceGlueCatalogId, table, message,
						ddbTblNameForTableStatusTracking, sqsQueueURL, exportBatchId, skipTableArchive);
			} else if (isLargeTable) {
				sqsUtil.sendLargeTableSchemaToSQS(sqs, sqsQueueURLLargeTable, exportBatchId, sourceGlueCatalogId,
						message, largeTable);
			}
		}
	}