in src/main/java/com/amazonaws/gdcreplication/util/DDBUtil.java [135:176]
public boolean trackTableExportStatus(String ddbTblName, String glueDBName, String glueTableName,
String glueTableSchema, String snsMsgId, String glueCatalogId, long exportRunId, String exportBatchId,
boolean isExported, boolean isLargeTable, String bucketName, String objectKey) {
boolean itemInserted = false;
if (Optional.of(glueDBName).isPresent() && Optional.of(glueTableName).isPresent()
&& Optional.of(glueTableSchema).isPresent() && Optional.of(snsMsgId).isPresent()) {
ClientConfiguration cc = new ClientConfiguration();
cc.setMaxErrorRetry(10);
AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().withClientConfiguration(cc).build();
DynamoDB dynamoDB = new DynamoDB(client);
com.amazonaws.services.dynamodbv2.document.Table table = dynamoDB.getTable(ddbTblName);
Item item = new Item().withPrimaryKey("table_id", glueTableName.concat("|").concat(glueDBName))
.withNumber("export_run_id", exportRunId).withString("export_batch_id", exportBatchId)
.withString("source_glue_catalog_id", glueCatalogId).withString("table_schema", glueTableSchema)
.withString("sns_msg_id", snsMsgId).withBoolean("is_exported", isExported)
.withBoolean("is_large_table", isLargeTable);
if(Optional.ofNullable(bucketName).isPresent() && Optional.ofNullable(objectKey).isPresent())
item.withString("s3_bucket_name", bucketName).withString("object_key", objectKey);
// Write the item to the table
try {
PutItemOutcome outcome = table.putItem(item);
int statusCode = outcome.getPutItemResult().getSdkHttpMetadata().getHttpStatusCode();
if (statusCode == 200) {
itemInserted = true;
System.out.println("Table item inserted to DynamoDB table. Table name: " + glueTableName);
}
} catch(Exception e) {
e.printStackTrace();
System.out.println("Could not insert a Table export status to DynamoDB table: " + ddbTblName);
}
dynamoDB.shutdown();
} else {
System.out.println("Not all the values present to insert Table item to ");
}
return itemInserted;
}