public void createPartition()

in fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java [356:429]


    public void createPartition(
            TablePath tablePath,
            long tableId,
            PartitionAssignment partitionAssignment,
            ResolvedPartitionSpec partition,
            boolean ignoreIfExists) {
        String partitionName = partition.getPartitionName();
        Optional<TablePartition> optionalTablePartition =
                getOptionalTablePartition(tablePath, partitionName);
        if (optionalTablePartition.isPresent()) {
            if (ignoreIfExists) {
                return;
            }
            throw new PartitionAlreadyExistsException(
                    String.format(
                            "Partition '%s' already exists for table %s",
                            partition.getPartitionQualifiedName(), tablePath));
        }

        final int partitionNumber;
        try {
            partitionNumber = zookeeperClient.getPartitionNumber(tablePath);
            if (partitionNumber + 1 > maxPartitionNum) {
                throw new TooManyPartitionsException(
                        String.format(
                                "Exceed the maximum number of partitions for table %s, only allow %s partitions.",
                                tablePath, maxPartitionNum));
            }
        } catch (TooManyPartitionsException e) {
            throw e;
        } catch (Exception e) {
            throw new FlussRuntimeException(
                    String.format(
                            "Get the number of partition from zookeeper failed for table %s",
                            tablePath),
                    e);
        }

        try {
            int bucketCount = partitionAssignment.getBucketAssignments().size();
            // currently, every partition has the same bucket count
            int totalBuckets = bucketCount * (partitionNumber + 1);
            if (totalBuckets > maxBucketNum) {
                throw new TooManyBucketsException(
                        String.format(
                                "Adding partition '%s' would result in %d total buckets for table %s, exceeding the maximum of %d buckets.",
                                partition.getPartitionName(),
                                totalBuckets,
                                tablePath,
                                maxBucketNum));
            }
        } catch (TooManyBucketsException e) {
            throw e;
        } catch (Exception e) {
            throw new FlussRuntimeException(
                    String.format("Failed to check total bucket count for table %s", tablePath), e);
        }

        try {
            long partitionId = zookeeperClient.getPartitionIdAndIncrement();
            // register partition assignments to zk first
            zookeeperClient.registerPartitionAssignment(partitionId, partitionAssignment);
            // then register the partition metadata to zk
            zookeeperClient.registerPartition(tablePath, tableId, partitionName, partitionId);
            LOG.info(
                    "Register partition {} to zookeeper for table [{}].", partitionName, tablePath);
        } catch (Exception e) {
            throw new FlussRuntimeException(
                    String.format(
                            "Register partition to zookeeper failed to create partition %s for table [%s]",
                            partitionName, tablePath),
                    e);
        }
    }