fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/enumerator/FlinkSourceEnumerator.java [289:331]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionInfos) {
        final Set<Partition> newPartitions =
                fetchedPartitionInfos.stream()
                        .map(p -> new Partition(p.getPartitionId(), p.getPartitionName()))
                        .collect(Collectors.toSet());
        final Set<Partition> removedPartitions = new HashSet<>();

        Set<Partition> assignedOrPendingPartitions = new HashSet<>();
        assignedPartitions.forEach(
                (partitionId, partitionName) ->
                        assignedOrPendingPartitions.add(new Partition(partitionId, partitionName)));

        pendingSplitAssignment.values().stream()
                .flatMap(Collection::stream)
                .forEach(
                        split -> {
                            long partitionId =
                                    checkNotNull(
                                            split.getTableBucket().getPartitionId(),
                                            "partition id shouldn't be null for the splits of partitioned table.");
                            String partitionName =
                                    checkNotNull(
                                            split.getPartitionName(),
                                            "partition name shouldn't be null for the splits of partitioned table.");
                            assignedOrPendingPartitions.add(
                                    new Partition(partitionId, partitionName));
                        });

        assignedOrPendingPartitions.forEach(
                p -> {
                    if (!newPartitions.remove(p)) {
                        removedPartitions.add(p);
                    }
                });

        if (!removedPartitions.isEmpty()) {
            LOG.info("Discovered removed partitions: {}", removedPartitions);
        }
        if (!newPartitions.isEmpty()) {
            LOG.info("Discovered new partitions: {}", newPartitions);
        }

        return new PartitionChange(newPartitions, removedPartitions);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



fluss-lakehouse/fluss-lakehouse-paimon/src/main/java/com/alibaba/fluss/lakehouse/paimon/source/enumerator/FlinkSourceEnumerator.java [569:610]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
    private PartitionChange getPartitionChange(Set<PartitionInfo> fetchedPartitionInfos) {
        final Set<Partition> newPartitions =
                fetchedPartitionInfos.stream()
                        .map(p -> new Partition(p.getPartitionId(), p.getPartitionName()))
                        .collect(Collectors.toSet());
        final Set<Partition> removedPartitions = new HashSet<>();

        Set<Partition> assignedOrPendingPartitions = new HashSet<>();
        assignedPartitions.forEach(
                (partitionId, partitionName) ->
                        assignedOrPendingPartitions.add(new Partition(partitionId, partitionName)));
        pendingSplitAssignment.values().stream()
                .flatMap(Collection::stream)
                .forEach(
                        split -> {
                            long partitionId =
                                    checkNotNull(
                                            split.getTableBucket().getPartitionId(),
                                            "partition id shouldn't be null for the splits of partitioned table.");
                            String partitionName =
                                    checkNotNull(
                                            split.getPartitionName(),
                                            "partition name shouldn't be null for the splits of partitioned table.");
                            assignedOrPendingPartitions.add(
                                    new Partition(partitionId, partitionName));
                        });

        assignedOrPendingPartitions.forEach(
                p -> {
                    if (!newPartitions.remove(p)) {
                        removedPartitions.add(p);
                    }
                });

        if (!removedPartitions.isEmpty()) {
            LOG.info("Discovered removed partitions: {}", removedPartitions);
        }
        if (!newPartitions.isEmpty()) {
            LOG.info("Discovered new partitions: {}", newPartitions);
        }

        return new PartitionChange(newPartitions, removedPartitions);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



