in statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java [67:92]
void updateStateValues(List<PersistedValueMutation> valueMutations) {
for (PersistedValueMutation mutate : valueMutations) {
final String stateName = mutate.getStateName();
switch (mutate.getMutationType()) {
case DELETE:
{
getStateHandleOrThrow(stateName).clear();
break;
}
case MODIFY:
{
final RemotePersistedValue registeredHandle = getStateHandleOrThrow(stateName);
final TypedValue newStateValue = mutate.getStateValue();
validateType(registeredHandle, newStateValue.getTypename());
registeredHandle.set(newStateValue.getValue().toByteArray());
break;
}
case UNRECOGNIZED:
throw new IllegalStateException(
"Received an UNRECOGNIZED PersistedValueMutation type. This may be caused by a mismatch or incompatibility with the remote function SDK version and the Stateful Functions version.");
default:
throw new IllegalStateException("Unexpected value: " + mutate.getMutationType());
}
}
}