in src/main/java/com/amazonaws/gdcreplication/util/GDCUtil.java [30:125]
public void processTableSchema(AWSGlue glue, AmazonSQS sqs, String targetGlueCatalogId, String sourceGlueCatalogId,
TableWithPartitions tableWithPartitions, String message, String ddbTblNameForTableStatusTracking,
String sqsQueueURL, String exportBatchId, boolean skipTableArchive) {
DDBUtil ddbUtil = new DDBUtil();
SQSUtil sqsUtil = new SQSUtil();
GlueUtil glueUtil = new GlueUtil();
long importRunId = System.currentTimeMillis();
// Get Table and its Partitions from Input JSON
Table table = tableWithPartitions.getTable();
List<Partition> partitionListFromExport = tableWithPartitions.getPartitionList();
// Create or update table
TableReplicationStatus tableStatus = glueUtil.createOrUpdateTable(glue, table, targetGlueCatalogId,
skipTableArchive);
// If database not found then create one
if (tableStatus.isDbNotFoundError()) {
System.out.printf("Creating Database with name: '%s'. \n", table.getDatabaseName());
DBReplicationStatus dbStatus = glueUtil.createGlueDatabase(glue, targetGlueCatalogId,
table.getDatabaseName(),
"Database Imported from Glue Data Catalog of AWS Account Id: ".concat(sourceGlueCatalogId));
// Now, try to create / update table again.
if (dbStatus.isCreated()) {
tableStatus = glueUtil.createOrUpdateTable(glue, tableWithPartitions.getTable(), targetGlueCatalogId,
skipTableArchive);
}
}
tableStatus.setTableSchema(message);
// Update table partitions
if (!tableStatus.isError()) {
// Get table partitions from Target Account
List<Partition> partitionsB4Replication = glueUtil.getPartitions(glue, targetGlueCatalogId,
table.getDatabaseName(), table.getName());
System.out.println("Number of partitions before replication: " + partitionsB4Replication.size());
// Add Partitions to the table if the export has Partitions
if (partitionListFromExport.size() > 0) {
tableStatus.setExportHasPartitions(true);
if (partitionsB4Replication.size() == 0) {
System.out.println("Adding partitions based on the export.");
boolean partitionsAdded = glueUtil.addPartitions(glue, partitionListFromExport, targetGlueCatalogId,
table.getDatabaseName(), table.getName());
if (partitionsAdded)
tableStatus.setPartitionsReplicated(true);
} else {
System.out.println(
"Table has partitions. They will be deleted first before adding partitions based on Export.");
// delete partitions in batch mode
boolean partitionsDeleted = glueUtil.deletePartitions(glue, targetGlueCatalogId,
table.getDatabaseName(), table.getName(), partitionsB4Replication);
// Enable the below code for debugging purpose. Check number of table partitions after deletion
// List<Partition> partitionsAfterDeletion = glueUtil.getPartitions(glue, targetGlueCatalogId,
// table.getDatabaseName(), table.getName());
// System.out.println("Number of partitions after deletion: " + partitionsAfterDeletion.size());
// add partitions from S3 object
boolean partitionsAdded = glueUtil.addPartitions(glue, partitionListFromExport, targetGlueCatalogId,
table.getDatabaseName(), table.getName());
if (partitionsDeleted && partitionsAdded)
tableStatus.setPartitionsReplicated(true);
// Enable the below code for debugging purpose. Check number of table partitions after addition
// List<Partition> partitionsAfterAddition = glueUtil.getPartitions(glue, targetGlueCatalogId,
// table.getDatabaseName(), table.getName());
// System.out.println("Number of partitions after addition: " + partitionsAfterAddition.size());
}
} else if (partitionListFromExport.size() == 0) {
tableStatus.setExportHasPartitions(false);
if (partitionsB4Replication.size() > 0) {
// Export has no partitions but table already has some partitions. Those
// partitions will be deleted in batch mode.
boolean partitionsDeleted = glueUtil.deletePartitions(glue, targetGlueCatalogId,
table.getDatabaseName(), table.getName(), partitionsB4Replication);
if (partitionsDeleted)
tableStatus.setPartitionsReplicated(true);
}
}
}
// If there is any error in creating/updating table then send it to DLQ
else {
System.out.println("Error in creating/updating table in the Glue Data Catalog. It will be send to DLQ.");
sqsUtil.sendTableSchemaToDeadLetterQueue(sqs, sqsQueueURL, tableStatus, exportBatchId, sourceGlueCatalogId);
}
// Track status in DynamoDB
ddbUtil.trackTableImportStatus(tableStatus, sourceGlueCatalogId, targetGlueCatalogId, importRunId,
exportBatchId, ddbTblNameForTableStatusTracking);
System.out.printf(
"Processing of Table shcema completed. Result: Table replicated: %b, Export has partitions: %b, "
+ "Partitions replicated: %b, error: %b \n",
tableStatus.isReplicated(), tableStatus.isExportHasPartitions(), tableStatus.isPartitionsReplicated(),
tableStatus.isError());
}