flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/proxy/state/ProxyKeyedStateBackend.java [139:234]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                namespaceSerializer, newDescriptor, snapshotTransformFactory);
    }

    @SuppressWarnings("unchecked")
    protected <S extends State, T> StateDescriptor<S, T> createNewDescriptor(
            StateDescriptor<S, T> descriptor) {
        switch (descriptor.getType()) {
            case VALUE:
                {
                    return (StateDescriptor<S, T>)
                            new ValueStateDescriptor<>(
                                    stateNamePrefix.prefix(descriptor.getName()),
                                    descriptor.getSerializer());
                }
            case LIST:
                {
                    ListStateDescriptor<T> listStateDescriptor =
                            (ListStateDescriptor<T>) descriptor;
                    return (StateDescriptor<S, T>)
                            new ListStateDescriptor<>(
                                    stateNamePrefix.prefix(listStateDescriptor.getName()),
                                    listStateDescriptor.getElementSerializer());
                }
            case REDUCING:
                {
                    ReducingStateDescriptor<T> reducingStateDescriptor =
                            (ReducingStateDescriptor<T>) descriptor;
                    return (StateDescriptor<S, T>)
                            new ReducingStateDescriptor<>(
                                    stateNamePrefix.prefix(reducingStateDescriptor.getName()),
                                    reducingStateDescriptor.getReduceFunction(),
                                    reducingStateDescriptor.getSerializer());
                }
            case AGGREGATING:
                {
                    AggregatingStateDescriptor<?, ?, T> aggregatingStateDescriptor =
                            (AggregatingStateDescriptor<?, ?, T>) descriptor;
                    return new AggregatingStateDescriptor(
                            stateNamePrefix.prefix(aggregatingStateDescriptor.getName()),
                            aggregatingStateDescriptor.getAggregateFunction(),
                            aggregatingStateDescriptor.getSerializer());
                }
            case MAP:
                {
                    MapStateDescriptor<?, Map<?, ?>> mapStateDescriptor =
                            (MapStateDescriptor<?, Map<?, ?>>) descriptor;
                    return new MapStateDescriptor(
                            stateNamePrefix.prefix(mapStateDescriptor.getName()),
                            mapStateDescriptor.getKeySerializer(),
                            mapStateDescriptor.getValueSerializer());
                }
            default:
                throw new UnsupportedOperationException("Unsupported state type");
        }
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return wrappedBackend.getKeyGroupRange();
    }

    @Nonnull
    @Override
    public SavepointResources<K> savepoint() throws Exception {
        return wrappedBackend.savepoint();
    }

    @Override
    public void dispose() {
        // Do not dispose for poxy.
    }

    @Override
    public void close() throws IOException {
        // Do not close for poxy.
    }

    @Nonnull
    @Override
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
            KeyGroupedInternalPriorityQueue<T> create(
                    @Nonnull String stateName,
                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return wrappedBackend.create(
                stateNamePrefix.prefix(stateName), byteOrderedElementSerializer);
    }

    @Nonnull
    @Override
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
            long checkpointId,
            long timestamp,
            @Nonnull CheckpointStreamFactory streamFactory,
            @Nonnull CheckpointOptions checkpointOptions)
            throws Exception {
        return wrappedBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/proxy/state/ProxyKeyedStateBackend.java [139:234]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
                namespaceSerializer, newDescriptor, snapshotTransformFactory);
    }

    @SuppressWarnings("unchecked")
    protected <S extends State, T> StateDescriptor<S, T> createNewDescriptor(
            StateDescriptor<S, T> descriptor) {
        switch (descriptor.getType()) {
            case VALUE:
                {
                    return (StateDescriptor<S, T>)
                            new ValueStateDescriptor<>(
                                    stateNamePrefix.prefix(descriptor.getName()),
                                    descriptor.getSerializer());
                }
            case LIST:
                {
                    ListStateDescriptor<T> listStateDescriptor =
                            (ListStateDescriptor<T>) descriptor;
                    return (StateDescriptor<S, T>)
                            new ListStateDescriptor<>(
                                    stateNamePrefix.prefix(listStateDescriptor.getName()),
                                    listStateDescriptor.getElementSerializer());
                }
            case REDUCING:
                {
                    ReducingStateDescriptor<T> reducingStateDescriptor =
                            (ReducingStateDescriptor<T>) descriptor;
                    return (StateDescriptor<S, T>)
                            new ReducingStateDescriptor<>(
                                    stateNamePrefix.prefix(reducingStateDescriptor.getName()),
                                    reducingStateDescriptor.getReduceFunction(),
                                    reducingStateDescriptor.getSerializer());
                }
            case AGGREGATING:
                {
                    AggregatingStateDescriptor<?, ?, T> aggregatingStateDescriptor =
                            (AggregatingStateDescriptor<?, ?, T>) descriptor;
                    return new AggregatingStateDescriptor(
                            stateNamePrefix.prefix(aggregatingStateDescriptor.getName()),
                            aggregatingStateDescriptor.getAggregateFunction(),
                            aggregatingStateDescriptor.getSerializer());
                }
            case MAP:
                {
                    MapStateDescriptor<?, Map<?, ?>> mapStateDescriptor =
                            (MapStateDescriptor<?, Map<?, ?>>) descriptor;
                    return new MapStateDescriptor(
                            stateNamePrefix.prefix(mapStateDescriptor.getName()),
                            mapStateDescriptor.getKeySerializer(),
                            mapStateDescriptor.getValueSerializer());
                }
            default:
                throw new UnsupportedOperationException("Unsupported state type");
        }
    }

    @Override
    public KeyGroupRange getKeyGroupRange() {
        return wrappedBackend.getKeyGroupRange();
    }

    @Nonnull
    @Override
    public SavepointResources<K> savepoint() throws Exception {
        return wrappedBackend.savepoint();
    }

    @Override
    public void dispose() {
        // Do not dispose for poxy.
    }

    @Override
    public void close() throws IOException {
        // Do not close for poxy.
    }

    @Nonnull
    @Override
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>>
            KeyGroupedInternalPriorityQueue<T> create(
                    @Nonnull String stateName,
                    @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return wrappedBackend.create(
                stateNamePrefix.prefix(stateName), byteOrderedElementSerializer);
    }

    @Nonnull
    @Override
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
            long checkpointId,
            long timestamp,
            @Nonnull CheckpointStreamFactory streamFactory,
            @Nonnull CheckpointOptions checkpointOptions)
            throws Exception {
        return wrappedBackend.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



