in xtable-aws/src/main/java/org/apache/xtable/glue/GlueCatalogPartitionSyncOperations.java [155:207]
public void updatePartitionsToTable(
CatalogTableIdentifier catalogTableIdentifier, List<CatalogPartition> changedPartitions) {
HierarchicalTableIdentifier tableIdentifier =
toHierarchicalTableIdentifier(catalogTableIdentifier);
if (changedPartitions.isEmpty()) {
log.info("No partitions to change for {}", tableIdentifier.getTableName());
return;
}
log.info("Updating {} partition(s) in table {}", changedPartitions.size(), tableIdentifier);
try {
Table table =
GlueCatalogTableUtils.getTable(
glueClient, glueCatalogConfig.getCatalogId(), catalogTableIdentifier);
StorageDescriptor sd = table.storageDescriptor();
List<BatchUpdatePartitionRequestEntry> updatePartitionEntries =
changedPartitions.stream()
.map(
partition ->
BatchUpdatePartitionRequestEntry.builder()
.partitionInput(createPartitionInput(table, partition))
.partitionValueList(partition.getValues())
.build())
.collect(Collectors.toList());
List<BatchUpdatePartitionResponse> responses = new ArrayList<>();
CollectionUtils.batches(
updatePartitionEntries, glueCatalogConfig.getMaxPartitionsPerRequest())
.forEach(
batch -> {
BatchUpdatePartitionRequest request =
BatchUpdatePartitionRequest.builder()
.databaseName(tableIdentifier.getDatabaseName())
.tableName(tableIdentifier.getTableName())
.entries(batch)
.build();
responses.add(glueClient.batchUpdatePartition(request));
});
responses.forEach(
response -> {
if (CollectionUtils.nonEmpty(response.errors())) {
throw new CatalogSyncException(
"Fail to update partitions to "
+ tableIdentifier
+ " with error(s): "
+ response.errors());
}
});
} catch (Exception e) {
throw new CatalogSyncException("Fail to update partitions to " + tableIdentifier, e);
}
}