in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java [356:429]
public void createPartition(
TablePath tablePath,
long tableId,
PartitionAssignment partitionAssignment,
ResolvedPartitionSpec partition,
boolean ignoreIfExists) {
String partitionName = partition.getPartitionName();
Optional<TablePartition> optionalTablePartition =
getOptionalTablePartition(tablePath, partitionName);
if (optionalTablePartition.isPresent()) {
if (ignoreIfExists) {
return;
}
throw new PartitionAlreadyExistsException(
String.format(
"Partition '%s' already exists for table %s",
partition.getPartitionQualifiedName(), tablePath));
}
final int partitionNumber;
try {
partitionNumber = zookeeperClient.getPartitionNumber(tablePath);
if (partitionNumber + 1 > maxPartitionNum) {
throw new TooManyPartitionsException(
String.format(
"Exceed the maximum number of partitions for table %s, only allow %s partitions.",
tablePath, maxPartitionNum));
}
} catch (TooManyPartitionsException e) {
throw e;
} catch (Exception e) {
throw new FlussRuntimeException(
String.format(
"Get the number of partition from zookeeper failed for table %s",
tablePath),
e);
}
try {
int bucketCount = partitionAssignment.getBucketAssignments().size();
// currently, every partition has the same bucket count
int totalBuckets = bucketCount * (partitionNumber + 1);
if (totalBuckets > maxBucketNum) {
throw new TooManyBucketsException(
String.format(
"Adding partition '%s' would result in %d total buckets for table %s, exceeding the maximum of %d buckets.",
partition.getPartitionName(),
totalBuckets,
tablePath,
maxBucketNum));
}
} catch (TooManyBucketsException e) {
throw e;
} catch (Exception e) {
throw new FlussRuntimeException(
String.format("Failed to check total bucket count for table %s", tablePath), e);
}
try {
long partitionId = zookeeperClient.getPartitionIdAndIncrement();
// register partition assignments to zk first
zookeeperClient.registerPartitionAssignment(partitionId, partitionAssignment);
// then register the partition metadata to zk
zookeeperClient.registerPartition(tablePath, tableId, partitionName, partitionId);
LOG.info(
"Register partition {} to zookeeper for table [{}].", partitionName, tablePath);
} catch (Exception e) {
throw new FlussRuntimeException(
String.format(
"Register partition to zookeeper failed to create partition %s for table [%s]",
partitionName, tablePath),
e);
}
}