in src/main/java/com/amazonaws/gdcreplication/util/GlueUtil.java [397:451]
public boolean addPartitions(AWSGlue glue, List<Partition> partitionsToAdd, String catalogId, String databaseName,
String tableName) {
AtomicInteger numPartitionsAdded = new AtomicInteger();
boolean partitionsAdded = false;
BatchCreatePartitionRequest batchCreatePartitionRequest = new BatchCreatePartitionRequest();
batchCreatePartitionRequest.setCatalogId(catalogId);
batchCreatePartitionRequest.setDatabaseName(databaseName);
batchCreatePartitionRequest.setTableName(tableName);
List<PartitionInput> partitionInputList = new ArrayList<PartitionInput>();
for (Partition p : partitionsToAdd) {
PartitionInput pi = new PartitionInput();
StorageDescriptor storageDescriptor = p.getStorageDescriptor();
pi.setStorageDescriptor(storageDescriptor);
pi.setValues(p.getValues());
partitionInputList.add(pi);
}
System.out.println("Partition Input List Size: " + partitionInputList.size());
if(partitionInputList.size() > 100)
System.out.println("The input has more than 100 partitions, it will be sliced into smaller lists with 100 partitions each.");
List<List<PartitionInput>> listofSmallerLists = Lists.partition(partitionInputList, 100);
for(List<PartitionInput> partInputList : listofSmallerLists) {
batchCreatePartitionRequest.setPartitionInputList(partInputList);
try {
BatchCreatePartitionResult result = glue.batchCreatePartition(batchCreatePartitionRequest);
int statusCode = result.getSdkHttpMetadata().getHttpStatusCode();
List<PartitionError> partErrors = result.getErrors();
if (statusCode == 200 && partErrors.size() == 0) {
System.out.printf("%d partitions were added to table '%s' of database '%s'. \n", partInputList.size(),
tableName, databaseName);
partitionsAdded = true;
numPartitionsAdded.getAndAdd(partInputList.size());
System.out.printf("%d of %d partitions added so far. \n", numPartitionsAdded.get(), partitionInputList.size());
} else {
System.out.printf("Not all partitions were added. Status Code: %d, Number of partition errors: %d \n",
statusCode, partErrors.size());
for (PartitionError pe : partErrors) {
System.out.println("Partition Error Message: " + pe.getErrorDetail().getErrorMessage());
List<String> pv = pe.getPartitionValues();
for (String v : pv) {
System.out.println("Partition error value: " + v);
}
}
}
} catch(Exception e) {
e.printStackTrace();
System.out.printf("Exception in adding partitions. \n");
System.out.printf("%d of %d partitions added so far. \n", numPartitionsAdded.get(), partitionInputList.size());
// TODO - what to do when there are exceptions here?
}
}
System.out.println("Total partitions added: " + numPartitionsAdded.get());
return partitionsAdded;
}