public String handleRequest()

in src/main/java/com/amazonaws/gdcreplication/lambda/ImportLargeTable.java [33:84]


	public String handleRequest(SQSEvent event, Context context) {

		String region = Optional.ofNullable(System.getenv("region")).orElse(Regions.US_EAST_1.getName());
		String targetGlueCatalogId = Optional.ofNullable(System.getenv("target_glue_catalog_id")).orElse("1234567890");
		boolean skipTableArchive = Boolean
				.parseBoolean(Optional.ofNullable(System.getenv("skip_archive")).orElse("true"));
		String ddbTblNameForTableStatusTracking = Optional.ofNullable(System.getenv("ddb_name_table_import_status"))
				.orElse("ddb_name_table_import_status");
		boolean recordProcessed = false;
		
		// Print environment variables
		printEnvVariables(targetGlueCatalogId, skipTableArchive, ddbTblNameForTableStatusTracking, region);
				
		// Set client configuration
		ClientConfiguration cc = new ClientConfiguration();
		cc.setMaxErrorRetry(10);

		// Create Objects for Glue and SQS
		AWSGlue glue = AWSGlueClientBuilder.standard().withRegion(region).withClientConfiguration(cc).build();
		AmazonSQS sqs = AmazonSQSClientBuilder.standard().withRegion(region).withClientConfiguration(cc).build();
		
		// Iterate and process all the messages which are part of SQSEvent
		System.out.println("Number of messages in SQS Event: " + event.getRecords().size());
		for (SQSMessage msg : event.getRecords()) {
			String ddl = new String(msg.getBody());
			String exportBatchId = "";
			String schemaType = "";
			String sourceGlueCatalogId = "";
			// Read Message Attributes
			for (Entry<String, MessageAttribute> entry : msg.getMessageAttributes().entrySet()) {
				if ("ExportBatchId".equalsIgnoreCase(entry.getKey())) {
					exportBatchId = entry.getValue().getStringValue();
					System.out.println("Export Batch Id: " + exportBatchId);
				} else if ("SourceGlueDataCatalogId".equalsIgnoreCase(entry.getKey())) {
					sourceGlueCatalogId = entry.getValue().getStringValue();
					System.out.println("Source Glue Data Cagalog Id: " + sourceGlueCatalogId);
				} else if ("SchemaType".equalsIgnoreCase(entry.getKey())) {
					schemaType = entry.getValue().getStringValue();
					System.out.println("Message Schema Type " + schemaType);
				}
			}
			if (schemaType.equalsIgnoreCase("largeTable")) {
				recordProcessed = processsRecord(context, glue, sqs, targetGlueCatalogId, ddbTblNameForTableStatusTracking,
						ddl, skipTableArchive, exportBatchId, sourceGlueCatalogId, region);
			}
			if (!recordProcessed) {
				System.out.printf("Input message '%s' could not be processed. This is an exception. It will be reprocessed again. \n", ddl);
				throw new RuntimeException();
			}
		}
		return "Success";
	}