in src/main/java/com/amazonaws/gdcreplication/lambda/ImportLargeTable.java [118:226]
public boolean processsRecord(Context context, AWSGlue glue, AmazonSQS sqs,
String targetGlueCatalogId, String ddbTblNameForTableStatusTracking, String message,
boolean skipTableArchive, String exportBatchId, String sourceGlueCatalogId, String region) {
boolean recordProcessed = false;
Gson gson = new Gson();
S3Util s3Util = new S3Util();
DDBUtil ddbUtil = new DDBUtil();
GlueUtil glueUtil = new GlueUtil();
LargeTable largeTable = null;
TableReplicationStatus tableStatus = null;
long importRunId = System.currentTimeMillis();
// Parse input message to LargeTable object
try {
largeTable = gson.fromJson(message, LargeTable.class);
} catch (JsonSyntaxException e) {
System.out.println("Cannot parse SNS message to Glue Table Type.");
e.printStackTrace();
}
// Create or update Table
if (Optional.ofNullable(largeTable).isPresent()) {
tableStatus = glueUtil.createOrUpdateTable(glue, largeTable.getTable(), targetGlueCatalogId,
skipTableArchive);
tableStatus.setTableSchema(message);
}
// Update table partitions
if (!tableStatus.isError()) {
// Get partitions from S3
List<Partition> partitionListFromExport = s3Util.getPartitionsFromS3(region, largeTable.getS3BucketName(),
largeTable.getS3ObjectKey());
// Get table partitions from Target Account
List<Partition> partitionsB4Replication = glueUtil.getPartitions(glue, targetGlueCatalogId,
largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
System.out.println("Number of partitions before replication: " + partitionsB4Replication.size());
// Add Partitions to the table if the export has Partitions
if (tableStatus.isReplicated() && partitionListFromExport.size() > 0) {
tableStatus.setExportHasPartitions(true);
if (partitionsB4Replication.size() == 0) {
System.out.println("Adding partitions based on the export.");
boolean partitionsAdded = glueUtil.addPartitions(glue, partitionListFromExport, targetGlueCatalogId,
largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
if (partitionsAdded) {
tableStatus.setPartitionsReplicated(true);
recordProcessed = true;
}
} else {
System.out.println(
"Target table has partitions. They will be deleted first before adding partitions based on Export.");
// delete partitions in batch mode
boolean partitionsDeleted = glueUtil.deletePartitions(glue, targetGlueCatalogId,
largeTable.getTable().getDatabaseName(), largeTable.getTable().getName(),
partitionsB4Replication);
// Enable the below code for debugging purpose. Check number of table partitions after deletion
// List<Partition> partitionsAfterDeletion = glueUtil.getPartitions(glue, targetGlueCatalogId,
// largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
// System.out.println("Number of partitions after deletion: " + partitionsAfterDeletion.size());
// add partitions from S3 object
boolean partitionsAdded = glueUtil.addPartitions(glue, partitionListFromExport, targetGlueCatalogId,
largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
if (partitionsDeleted && partitionsAdded) {
tableStatus.setPartitionsReplicated(true);
recordProcessed = true;
}
// Enable the below code for debugging purpose. Check number of table partitions after addition
// List<Partition> partitionsAfterAddition = glueUtil.getPartitions(glue, targetGlueCatalogId,
// largeTable.getTable().getDatabaseName(), largeTable.getTable().getName());
// System.out.println("Number of partitions after addition: " + partitionsAfterAddition.size());
}
} else if (tableStatus.isReplicated() && partitionListFromExport.size() == 0) {
tableStatus.setExportHasPartitions(false);
if (partitionsB4Replication.size() > 0) {
// Export has no partitions but table already has some partitions. Those
// partitions will be deleted in batch mode.
boolean partitionsDeleted = glueUtil.deletePartitions(glue, targetGlueCatalogId,
largeTable.getTable().getDatabaseName(), largeTable.getTable().getName(),
partitionsB4Replication);
if (partitionsDeleted) {
tableStatus.setPartitionsReplicated(true);
recordProcessed = true;
}
}
}
}
// If there is any error in creating/updating table then send it to DLQ
else {
System.out.println("Table replicated but partitions were not replicated. Message will be reprocessed again.");
}
// Track status in DynamoDB
ddbUtil.trackTableImportStatus(tableStatus, sourceGlueCatalogId, targetGlueCatalogId, importRunId,
exportBatchId, ddbTblNameForTableStatusTracking);
System.out.printf(
"Processing of Table shcema completed. Result: Table replicated: %b, Export has partitions: %b, "
+ "Partitions replicated: %b, error: %b \n",
tableStatus.isReplicated(), tableStatus.isExportHasPartitions(), tableStatus.isPartitionsReplicated(),
tableStatus.isError());
return recordProcessed;
}