in flink-ml-iteration/flink-ml-iteration-1.15/src/main/java/org/apache/flink/iteration/proxy/state/ProxyKeyedStateBackend.java [143:193]
protected <S extends State, T> StateDescriptor<S, T> createNewDescriptor(
StateDescriptor<S, T> descriptor) {
switch (descriptor.getType()) {
case VALUE:
{
return (StateDescriptor<S, T>)
new ValueStateDescriptor<>(
stateNamePrefix.prefix(descriptor.getName()),
descriptor.getSerializer());
}
case LIST:
{
ListStateDescriptor<T> listStateDescriptor =
(ListStateDescriptor<T>) descriptor;
return (StateDescriptor<S, T>)
new ListStateDescriptor<>(
stateNamePrefix.prefix(listStateDescriptor.getName()),
listStateDescriptor.getElementSerializer());
}
case REDUCING:
{
ReducingStateDescriptor<T> reducingStateDescriptor =
(ReducingStateDescriptor<T>) descriptor;
return (StateDescriptor<S, T>)
new ReducingStateDescriptor<>(
stateNamePrefix.prefix(reducingStateDescriptor.getName()),
reducingStateDescriptor.getReduceFunction(),
reducingStateDescriptor.getSerializer());
}
case AGGREGATING:
{
AggregatingStateDescriptor<?, ?, T> aggregatingStateDescriptor =
(AggregatingStateDescriptor<?, ?, T>) descriptor;
return new AggregatingStateDescriptor(
stateNamePrefix.prefix(aggregatingStateDescriptor.getName()),
aggregatingStateDescriptor.getAggregateFunction(),
aggregatingStateDescriptor.getSerializer());
}
case MAP:
{
MapStateDescriptor<?, Map<?, ?>> mapStateDescriptor =
(MapStateDescriptor<?, Map<?, ?>>) descriptor;
return new MapStateDescriptor(
stateNamePrefix.prefix(mapStateDescriptor.getName()),
mapStateDescriptor.getKeySerializer(),
mapStateDescriptor.getValueSerializer());
}
default:
throw new UnsupportedOperationException("Unsupported state type");
}
}