public boolean processsRecord()

in src/main/java/com/amazonaws/gdcreplication/lambda/ImportLargeTable.java [118:226]


	public boolean processsRecord(Context context, AWSGlue glue, AmazonSQS sqs,
			String targetGlueCatalogId, String ddbTblNameForTableStatusTracking, String message,
			boolean skipTableArchive, String exportBatchId, String sourceGlueCatalogId, String region) {

		boolean recordProcessed = false;
		Gson gson = new Gson();
		S3Util s3Util = new S3Util();
		DDBUtil ddbUtil = new DDBUtil();
		GlueUtil glueUtil = new GlueUtil();

		LargeTable largeTable = null;
		TableReplicationStatus tableStatus = null;
		long importRunId = System.currentTimeMillis();

		// Parse input message to LargeTable object
		try {
			largeTable = gson.fromJson(message, LargeTable.class);
		} catch (JsonSyntaxException e) {
			System.out.println("Cannot parse SNS message to Glue Table Type.");
			e.printStackTrace();
		}

		// Create or update Table
		if (Optional.ofNullable(largeTable).isPresent()) {
			tableStatus = glueUtil.createOrUpdateTable(glue, largeTable.getTable(), targetGlueCatalogId,
					skipTableArchive);
			tableStatus.setTableSchema(message);
		}

		// Update table partitions
		if (!tableStatus.isError()) {
			// Get partitions from S3
			List<Partition> partitionListFromExport = s3Util.getPartitionsFromS3(region, largeTable.getS3BucketName(),
					largeTable.getS3ObjectKey());

			// Get table partitions from Target Account
			List<Partition> partitionsB4Replication = glueUtil.getPartitions(glue, targetGlueCatalogId,
					largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
			System.out.println("Number of partitions before replication: " + partitionsB4Replication.size());

			// Add Partitions to the table if the export has Partitions
			if (tableStatus.isReplicated() && partitionListFromExport.size() > 0) {
				tableStatus.setExportHasPartitions(true);
				if (partitionsB4Replication.size() == 0) {
					System.out.println("Adding partitions based on the export.");
					boolean partitionsAdded = glueUtil.addPartitions(glue, partitionListFromExport, targetGlueCatalogId,
							largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
					if (partitionsAdded) {
						tableStatus.setPartitionsReplicated(true);
						recordProcessed = true;
					}
				} else {
					System.out.println(
							"Target table has partitions. They will be deleted first before adding partitions based on Export.");
					// delete partitions in batch mode
					boolean partitionsDeleted = glueUtil.deletePartitions(glue, targetGlueCatalogId,
							largeTable.getTable().getDatabaseName(), largeTable.getTable().getName(),
							partitionsB4Replication);

					// Enable the below code for debugging purpose. Check number of table partitions after deletion
//					List<Partition> partitionsAfterDeletion = glueUtil.getPartitions(glue, targetGlueCatalogId,
//							largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
//					System.out.println("Number of partitions after deletion: " + partitionsAfterDeletion.size());

					// add partitions from S3 object
					boolean partitionsAdded = glueUtil.addPartitions(glue, partitionListFromExport, targetGlueCatalogId,
							largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());

					if (partitionsDeleted && partitionsAdded) {
						tableStatus.setPartitionsReplicated(true);
						recordProcessed = true;
					}
					// Enable the below code for debugging purpose. Check number of table partitions after addition
//					List<Partition> partitionsAfterAddition = glueUtil.getPartitions(glue, targetGlueCatalogId,
//							largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
//					System.out.println("Number of partitions after addition: " + partitionsAfterAddition.size());
				}

			} else if (tableStatus.isReplicated() && partitionListFromExport.size() == 0) {
				tableStatus.setExportHasPartitions(false);
				if (partitionsB4Replication.size() > 0) {
					// Export has no partitions but table already has some partitions. Those
					// partitions will be deleted in batch mode.
					boolean partitionsDeleted = glueUtil.deletePartitions(glue, targetGlueCatalogId,
							largeTable.getTable().getDatabaseName(), largeTable.getTable().getName(),
							partitionsB4Replication);
					if (partitionsDeleted) {
						tableStatus.setPartitionsReplicated(true);
						recordProcessed = true;
					}
				}
			}
		}
		// If there is any error in creating/updating table then send it to DLQ
		else {
			System.out.println("Table replicated but partitions were not replicated. Message will be reprocessed again.");
		}

		// Track status in DynamoDB
		ddbUtil.trackTableImportStatus(tableStatus, sourceGlueCatalogId, targetGlueCatalogId, importRunId,
				exportBatchId, ddbTblNameForTableStatusTracking);
		System.out.printf(
				"Processing of Table shcema completed. Result: Table replicated: %b, Export has partitions: %b, "
						+ "Partitions replicated: %b, error: %b \n",
				tableStatus.isReplicated(), tableStatus.isExportHasPartitions(), tableStatus.isPartitionsReplicated(),
				tableStatus.isError());
		
		return recordProcessed;
	}