public static void processSNSEvent()

in src/main/java/com/amazonaws/gdcreplication/lambda/ExportDatabaseWithTables.java [120:233]


	public static void processSNSEvent(List<SNSRecord> snsRecods, Context context, AWSGlue glue, AmazonSNS sns,
			AmazonSQS sqs, String sourceGlueCatalogId, String ddbTblNameForDBStatusTracking,
			String ddbTblNameForTableStatusTracking, String topicArn, String sqsQueue4LargePartTables,
			int partitionThreshold) {
		Database db = null;
		Gson gson = new Gson();
		DDBUtil ddbUtil = new DDBUtil();
		SNSUtil snsUtil = new SNSUtil();
		GlueUtil glueUtil = new GlueUtil();
		SQSUtil sqsUtil = new SQSUtil();
		long exportRunId = System.currentTimeMillis();
		
		for (SNSRecord snsRecod : snsRecods) {
			
			List<WriteRequest> itemList = new ArrayList<WriteRequest>();
			
			boolean isDatabaseType = false;
			AtomicInteger numberOfTablesExported = new AtomicInteger();
			String databaseDDL = snsRecod.getSNS().getMessage();
			context.getLogger().log("SNS Message Payload: " + databaseDDL);
			Map<String, MessageAttribute> msgAttributeMap = snsRecod.getSNS().getMessageAttributes();
			MessageAttribute msgAttrMessageType = msgAttributeMap.get("message_type");
			MessageAttribute msgAttrExportBatchId = msgAttributeMap.get("export_batch_id");
			
			context.getLogger().log("Message Attribute value: " + msgAttrMessageType.getValue());
			// Convert Message to Glue Database Type
			try {
				if (msgAttrMessageType.getValue().equalsIgnoreCase("database")) {
					db = gson.fromJson(databaseDDL, Database.class);
					isDatabaseType = true;
				}
			} catch (JsonSyntaxException e) {
				System.out.println("Cannot parse SNS message to Glue Database Type.");
				e.printStackTrace();
			}
			if (isDatabaseType) {
				// Check if a database exist in Glue
				Database database = glueUtil.getDatabaseIfExist(glue, sourceGlueCatalogId, db);
				if (Optional.ofNullable(database).isPresent()) {
					PublishResult publishDBResponse = snsUtil.publishDatabaseSchemaToSNS(sns, topicArn, databaseDDL,
							sourceGlueCatalogId, msgAttrExportBatchId.getValue());
					if (Optional.ofNullable(publishDBResponse.getMessageId()).isPresent()) {
						System.out.println("Database schema published to SNS Topic. Message_Id: "
								+ publishDBResponse.getMessageId());
						ddbUtil.trackDatabaseExportStatus(ddbTblNameForDBStatusTracking, db.getName(), databaseDDL,
								publishDBResponse.getMessageId(), sourceGlueCatalogId, exportRunId, msgAttrExportBatchId.getValue(), true);
					} else {
						ddbUtil.trackDatabaseExportStatus(ddbTblNameForDBStatusTracking, db.getName(), databaseDDL, "",
								sourceGlueCatalogId, exportRunId, msgAttrExportBatchId.getValue(), false);
					}
					// Get Tables for a given Database
					List<Table> dbTableList = glueUtil.getTables(glue, sourceGlueCatalogId, database.getName());
					for (Table table : dbTableList) {
						List<Partition> partitionList = glueUtil.getPartitions(glue, sourceGlueCatalogId, table.getDatabaseName(), table.getName());
						if(partitionList.size() <= partitionThreshold) {
							System.out.printf("Database: %s, Table: %s, num_partitions: %d \n", table.getDatabaseName(), table.getName(), partitionList.size());
							TableWithPartitions tableWithParts = new TableWithPartitions();
							tableWithParts.setPartitionList(partitionList);
							tableWithParts.setTable(table);
							
							// Convert Table to JSON String
							String tableDDL = gson.toJson(tableWithParts);
							
							// Publish a message to Amazon SNS topic.
							PublishResult publishTableResponse = snsUtil.publishTableSchemaToSNS(sns, topicArn, table, tableDDL,
									sourceGlueCatalogId, msgAttrExportBatchId.getValue());
							
							Map<String, AttributeValue> item = new HashMap<String, AttributeValue>();
							item.put("table_id", new AttributeValue().withS(table.getName().concat("|").concat(table.getDatabaseName())));
							item.put("export_run_id", new AttributeValue().withN(Long.valueOf(exportRunId).toString()));
							item.put("export_batch_id", new AttributeValue().withS(msgAttrExportBatchId.getValue()));
							item.put("source_glue_catalog_id", new AttributeValue().withS(sourceGlueCatalogId));
							item.put("table_schema", new AttributeValue().withS(tableDDL)); 
							item.put("is_large_table", new AttributeValue().withS(Boolean.toString(false)));
							
							if (Optional.ofNullable(publishTableResponse.getMessageId()).isPresent()) {
								item.put("sns_msg_id", new AttributeValue().withS(publishTableResponse.getMessageId()));
								item.put("is_exported", new AttributeValue().withS(Boolean.toString(true)));	
								numberOfTablesExported.getAndIncrement();
							} else {
								item.put("sns_msg_id", new AttributeValue().withS(""));
								item.put("is_exported", new AttributeValue().withS(Boolean.toString(false)));	
							}
							
							itemList.add(new WriteRequest().withPutRequest(new PutRequest().withItem(item)));
						} else {
							LargeTable largeTable = new LargeTable();
							largeTable.setTable(table);
							largeTable.setLargeTable(true);
							largeTable.setNumberOfPartitions(partitionList.size());
							largeTable.setCatalogId(sourceGlueCatalogId);
							
							System.out.printf("Database: %s, Table: %s, num_partitions: %d \n", table.getDatabaseName(), table.getName(), partitionList.size());
							System.out.println("This will be sent to SQS Queue for furhter processing.");
							
							sqsUtil.sendTableSchemaToSQSQueue(sqs, sqsQueue4LargePartTables, largeTable, msgAttrExportBatchId.getValue(), sourceGlueCatalogId);
						}
					}
					System.out.printf("Inserting Table statistics to DynamoDB for database: %s \n", database.getName());
					ddbUtil.insertIntoDynamoDB(itemList, ddbTblNameForTableStatusTracking);
					System.out.printf(
							"Table export statistics: number of tables exist in Database = %d, number of tables exported to SNS = %d. \n",
							dbTableList.size(), numberOfTablesExported.get());
				} else
					System.out.printf(
							"There is no Database with name '%s' exist in Glue Data Catalog. Tables cannot be retrieved. \n",
							database.getName());
			} else {
				System.out.println(
						"Message received from SNS Topic seems to be invalid. It could not be converted to Glue Database Type.");
			}

		}
	}