flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/proxy/state/ProxyKeyedStateBackend.java [52:129]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class ProxyKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {

    private final CheckpointableKeyedStateBackend<K> wrappedBackend;

    private final StateNamePrefix stateNamePrefix;

    public ProxyKeyedStateBackend(
            CheckpointableKeyedStateBackend<K> wrappedBackend, StateNamePrefix stateNamePrefix) {
        this.wrappedBackend = wrappedBackend;
        this.stateNamePrefix = stateNamePrefix;
    }

    @Override
    public void setCurrentKey(K newKey) {
        wrappedBackend.setCurrentKey(newKey);
    }

    @Override
    public K getCurrentKey() {
        return wrappedBackend.getCurrentKey();
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return wrappedBackend.getKeySerializer();
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(
            N namespace,
            TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, T> stateDescriptor,
            KeyedStateFunction<K, S> function)
            throws Exception {
        StateDescriptor<S, T> newDescriptor = createNewDescriptor(stateDescriptor);
        wrappedBackend.applyToAllKeys(namespace, namespaceSerializer, newDescriptor, function);
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        return wrappedBackend.getKeys(stateNamePrefix.prefix(state), namespace);
    }

    @Override
    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        return wrappedBackend.getKeysAndNamespaces(stateNamePrefix.prefix(state));
    }

    @Override
    public <N, S extends State, T> S getOrCreateKeyedState(
            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor)
            throws Exception {
        StateDescriptor<S, T> newDescriptor = createNewDescriptor(stateDescriptor);
        return wrappedBackend.getOrCreateKeyedState(namespaceSerializer, newDescriptor);
    }

    @Override
    public <N, S extends State> S getPartitionedState(
            N namespace,
            TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, ?> stateDescriptor)
            throws Exception {
        StateDescriptor<S, ?> newDescriptor = createNewDescriptor(stateDescriptor);
        return wrappedBackend.getPartitionedState(namespace, namespaceSerializer, newDescriptor);
    }

    @Override
    public void registerKeySelectionListener(KeySelectionListener<K> listener) {
        wrappedBackend.registerKeySelectionListener(listener);
    }

    @Override
    public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener) {
        return wrappedBackend.deregisterKeySelectionListener(listener);
    }

    @Nonnull
    @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



flink-ml-iteration/flink-ml-iteration-common/src/main/java/org/apache/flink/iteration/proxy/state/ProxyKeyedStateBackend.java [52:129]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
public class ProxyKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {

    private final CheckpointableKeyedStateBackend<K> wrappedBackend;

    private final StateNamePrefix stateNamePrefix;

    public ProxyKeyedStateBackend(
            CheckpointableKeyedStateBackend<K> wrappedBackend, StateNamePrefix stateNamePrefix) {
        this.wrappedBackend = wrappedBackend;
        this.stateNamePrefix = stateNamePrefix;
    }

    @Override
    public void setCurrentKey(K newKey) {
        wrappedBackend.setCurrentKey(newKey);
    }

    @Override
    public K getCurrentKey() {
        return wrappedBackend.getCurrentKey();
    }

    @Override
    public TypeSerializer<K> getKeySerializer() {
        return wrappedBackend.getKeySerializer();
    }

    @Override
    public <N, S extends State, T> void applyToAllKeys(
            N namespace,
            TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, T> stateDescriptor,
            KeyedStateFunction<K, S> function)
            throws Exception {
        StateDescriptor<S, T> newDescriptor = createNewDescriptor(stateDescriptor);
        wrappedBackend.applyToAllKeys(namespace, namespaceSerializer, newDescriptor, function);
    }

    @Override
    public <N> Stream<K> getKeys(String state, N namespace) {
        return wrappedBackend.getKeys(stateNamePrefix.prefix(state), namespace);
    }

    @Override
    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String state) {
        return wrappedBackend.getKeysAndNamespaces(stateNamePrefix.prefix(state));
    }

    @Override
    public <N, S extends State, T> S getOrCreateKeyedState(
            TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor)
            throws Exception {
        StateDescriptor<S, T> newDescriptor = createNewDescriptor(stateDescriptor);
        return wrappedBackend.getOrCreateKeyedState(namespaceSerializer, newDescriptor);
    }

    @Override
    public <N, S extends State> S getPartitionedState(
            N namespace,
            TypeSerializer<N> namespaceSerializer,
            StateDescriptor<S, ?> stateDescriptor)
            throws Exception {
        StateDescriptor<S, ?> newDescriptor = createNewDescriptor(stateDescriptor);
        return wrappedBackend.getPartitionedState(namespace, namespaceSerializer, newDescriptor);
    }

    @Override
    public void registerKeySelectionListener(KeySelectionListener<K> listener) {
        wrappedBackend.registerKeySelectionListener(listener);
    }

    @Override
    public boolean deregisterKeySelectionListener(KeySelectionListener<K> listener) {
        return wrappedBackend.deregisterKeySelectionListener(listener);
    }

    @Nonnull
    @Override
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



