in statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java [81:115]
public static ResolutionResult resolve(
Map<String, ValueSpec<?>> registeredSpecs,
List<ToFunction.PersistedValue> protocolProvidedValues) {
// holds a set of missing ValueSpec's. a missing ValueSpec is a value spec that was
// registered by the user but wasn't sent to the SDK by the runtime.
// this can happen upon an initial request.
List<ValueSpec<?>> statesWithMissingValue =
null; // optimize for normal execution, where states aren't missing.
// holds the StateValueContext that will be used to serialize and deserialize user state.
final List<StateValueContext<?>> resolvedStateValues = new ArrayList<>(registeredSpecs.size());
for (ValueSpec<?> spec : registeredSpecs.values()) {
ToFunction.PersistedValue persistedValue =
findPersistedValueByName(protocolProvidedValues, spec.name());
if (persistedValue != null) {
resolvedStateValues.add(new StateValueContext<>(spec, persistedValue));
} else {
// oh no. the runtime doesn't know (yet) about a state that was registered by the user.
// we need to collect these.
statesWithMissingValue =
(statesWithMissingValue != null)
? statesWithMissingValue
: new ArrayList<>(registeredSpecs.size());
statesWithMissingValue.add(spec);
}
}
if (statesWithMissingValue == null) {
return new ResolutionResult(resolvedStateValues, null);
}
return new ResolutionResult(null, statesWithMissingValue);
}