in statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java [203:230]
public Optional<PersistedValueMutation> toProtocolValueMutation() {
final String typeNameString = spec.typeName().asTypeNameString();
switch (status) {
case MODIFIED:
final TypedValue.Builder newValue =
TypedValue.newBuilder()
.setTypename(typeNameString)
.setHasValue(true)
.setValue(serialize(serializer, cachedObject));
return Optional.of(
PersistedValueMutation.newBuilder()
.setStateName(spec.name())
.setMutationType(PersistedValueMutation.MutationType.MODIFY)
.setStateValue(newValue)
.build());
case DELETED:
return Optional.of(
PersistedValueMutation.newBuilder()
.setStateName(spec.name())
.setMutationType(PersistedValueMutation.MutationType.DELETE)
.build());
case UNMODIFIED:
return Optional.empty();
default:
throw new IllegalStateException("Unknown cell status: " + status);
}
}