in xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java [99:152]
public void addPartitionsToTable(
CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> partitionsToAdd) {
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
if (partitionsToAdd.isEmpty()) {
log.info("No partitions to add for {}", tableIdentifier);
return;
}
log.info("Adding {} CatalogPartition(s) in table {}", partitionsToAdd.size(), tableIdentifier);
try {
Table table =
GlueCatalogTableUtils.getTable(
glueClient, glueCatalogConfig.getCatalogId(), catalogTableIdentifier);
StorageDescriptor sd = table.storageDescriptor();
List<PartitionInput> partitionInputs =
partitionsToAdd.stream()
.map(partition -> createPartitionInput(table, partition))
.collect(Collectors.toList());
List<BatchCreatePartitionResponse> responses = new ArrayList<>();
CollectionUtils.batches(partitionInputs, glueCatalogConfig.getMaxPartitionsPerRequest())
.forEach(
batch -> {
BatchCreatePartitionRequest request =
BatchCreatePartitionRequest.builder()
.databaseName(tableIdentifier.getDatabaseName())
.tableName(tableIdentifier.getTableName())
.partitionInputList(batch)
.build();
responses.add(glueClient.batchCreatePartition(request));
});
responses.forEach(
response -> {
if (CollectionUtils.nonEmpty(response.errors())) {
if (response.errors().stream()
.allMatch(
(error) ->
"AlreadyExistsException".equals(error.errorDetail().errorCode()))) {
log.warn("Partitions already exist in glue: {}", response.errors());
} else {
throw new CatalogSyncException(
"Fail to add partitions to "
+ tableIdentifier
+ " with error(s): "
+ response.errors());
}
}
});
} catch (Exception e) {
throw new CatalogSyncException("Fail to add partitions to " + tableIdentifier, e);
}
}