protected StateDescriptor createNewDescriptor()

in flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/proxy/state/ProxyKeyedStateBackend.java [143:193]


    protected <S extends State, T> StateDescriptor<S, T> createNewDescriptor(
            StateDescriptor<S, T> descriptor) {
        switch (descriptor.getType()) {
            case VALUE:
                {
                    return (StateDescriptor<S, T>)
                            new ValueStateDescriptor<>(
                                    stateNamePrefix.prefix(descriptor.getName()),
                                    descriptor.getSerializer());
                }
            case LIST:
                {
                    ListStateDescriptor<T> listStateDescriptor =
                            (ListStateDescriptor<T>) descriptor;
                    return (StateDescriptor<S, T>)
                            new ListStateDescriptor<>(
                                    stateNamePrefix.prefix(listStateDescriptor.getName()),
                                    listStateDescriptor.getElementSerializer());
                }
            case REDUCING:
                {
                    ReducingStateDescriptor<T> reducingStateDescriptor =
                            (ReducingStateDescriptor<T>) descriptor;
                    return (StateDescriptor<S, T>)
                            new ReducingStateDescriptor<>(
                                    stateNamePrefix.prefix(reducingStateDescriptor.getName()),
                                    reducingStateDescriptor.getReduceFunction(),
                                    reducingStateDescriptor.getSerializer());
                }
            case AGGREGATING:
                {
                    AggregatingStateDescriptor<?, ?, T> aggregatingStateDescriptor =
                            (AggregatingStateDescriptor<?, ?, T>) descriptor;
                    return new AggregatingStateDescriptor(
                            stateNamePrefix.prefix(aggregatingStateDescriptor.getName()),
                            aggregatingStateDescriptor.getAggregateFunction(),
                            aggregatingStateDescriptor.getSerializer());
                }
            case MAP:
                {
                    MapStateDescriptor<?, Map<?, ?>> mapStateDescriptor =
                            (MapStateDescriptor<?, Map<?, ?>>) descriptor;
                    return new MapStateDescriptor(
                            stateNamePrefix.prefix(mapStateDescriptor.getName()),
                            mapStateDescriptor.getKeySerializer(),
                            mapStateDescriptor.getValueSerializer());
                }
            default:
                throw new UnsupportedOperationException("Unsupported state type");
        }
    }