in statefun-sdk-python/statefun/request_reply_v3.py [0:0]
def collect_failure(missing_state_specs: typing.List[ValueSpec]) -> FromFunction:
pb_from_function = FromFunction()
incomplete_context = pb_from_function.incomplete_invocation_context
missing_values = incomplete_context.missing_values
for state_spec in missing_state_specs:
missing_value = missing_values.add()
missing_value.state_name = state_spec.name
missing_value.type_typename = state_spec.type.typename
protocol_expiration_spec = FromFunction.ExpirationSpec()
if not state_spec.after_write and not state_spec.after_call:
protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.NONE
else:
protocol_expiration_spec.expire_after_millis = state_spec.duration
if state_spec.after_call:
protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.AFTER_INVOKE
elif state_spec.after_write:
protocol_expiration_spec.mode = FromFunction.ExpirationSpec.ExpireMode.AFTER_WRITE
else:
raise ValueError("Unexpected state expiration mode.")
missing_value.expiration_spec.CopyFrom(protocol_expiration_spec)
return pb_from_function