in aws-glue-datacatalog-client-common/src/main/java/com/amazonaws/glue/catalog/metastore/GlueMetastoreClientDelegate.java [610:677]
private List<Partition> batchCreatePartitions(
final List<org.apache.hadoop.hive.metastore.api.Partition> hivePartitions,
final boolean ifNotExists
) throws TException {
if (hivePartitions.isEmpty()) {
return Lists.newArrayList();
}
final String dbName = hivePartitions.get(0).getDbName();
final String tableName = hivePartitions.get(0).getTableName();
org.apache.hadoop.hive.metastore.api.Table tbl = getTable(dbName, tableName);
validateInputForBatchCreatePartitions(tbl, hivePartitions);
List<Partition> catalogPartitions = Lists.newArrayList();
Map<PartitionKey, Path> addedPath = Maps.newHashMap();
try {
for (org.apache.hadoop.hive.metastore.api.Partition partition : hivePartitions) {
Path location = getPartitionLocation(tbl, partition);
boolean partDirCreated = false;
if (location != null) {
partition.getSd().setLocation(location.toString());
partDirCreated = makeDirs(wh, location);
}
Partition catalogPartition = HiveToCatalogConverter.convertPartition(partition);
catalogPartitions.add(catalogPartition);
if (partDirCreated) {
addedPath.put(new PartitionKey(catalogPartition), new Path(partition.getSd().getLocation()));
}
}
} catch (MetaException e) {
for (Path path : addedPath.values()) {
deletePath(path);
}
throw e;
}
List<Future<BatchCreatePartitionsHelper>> batchCreatePartitionsFutures = Lists.newArrayList();
for (int i = 0; i < catalogPartitions.size(); i += BATCH_CREATE_PARTITIONS_MAX_REQUEST_SIZE) {
int j = Math.min(i + BATCH_CREATE_PARTITIONS_MAX_REQUEST_SIZE, catalogPartitions.size());
final List<Partition> partitionsOnePage = catalogPartitions.subList(i, j);
batchCreatePartitionsFutures.add(this.executorService.submit(new Callable<BatchCreatePartitionsHelper>() {
@Override
public BatchCreatePartitionsHelper call() throws Exception {
return new BatchCreatePartitionsHelper(glueMetastore, dbName, tableName, catalogId, partitionsOnePage, ifNotExists)
.createPartitions();
}
}));
}
TException tException = null;
List<Partition> partitionsCreated = Lists.newArrayList();
for (Future<BatchCreatePartitionsHelper> future : batchCreatePartitionsFutures) {
try {
BatchCreatePartitionsHelper batchCreatePartitionsHelper = future.get();
partitionsCreated.addAll(batchCreatePartitionsHelper.getPartitionsCreated());
tException = tException == null ? batchCreatePartitionsHelper.getFirstTException() : tException;
deletePathForPartitions(batchCreatePartitionsHelper.getPartitionsFailed(), addedPath);
} catch (Exception e) {
logger.error("Exception thrown by BatchCreatePartitions thread pool. ", e);
}
}
if (tException != null) {
throw tException;
}
return partitionsCreated;
}