statefun-sdk-python/statefun/core.py (104 lines of code) (raw):

################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import abc from dataclasses import dataclass from datetime import timedelta from keyword import iskeyword, kwlist @dataclass(repr=True, eq=True, order=False, frozen=True) class SdkAddress: namespace: str name: str id: str typename: str class TypeSerializer(metaclass=abc.ABCMeta): __slots__ = () """ A base class for a TypeSerializer. A TypeSerializer is responsible of serialising and deserializing a specific value type. """ @abc.abstractmethod def serialize(self, value) -> bytes: """ Serialize the given value to bytes. :param value: the value to serialize. :return: a byte representation of the given value. """ pass @abc.abstractmethod def deserialize(self, byte: bytes): """ Deserialize a value from the given byte representation. :param byte: the bytes that represent the value to deseralize. :return: the deserialized value. """ pass class Type(metaclass=abc.ABCMeta): __slots__ = ("typename",) """ A base representation of a Stateful Function value type. """ def __init__(self, typename: str): """ :param typename: a TypeName represented as a string of the form <namespace>/<name> of this type. """ if not typename: raise ValueError("typename can not be missing") self.typename = typename @abc.abstractmethod def serializer(self) -> TypeSerializer: """ :return: a serializer for this type. """ pass class ValueSpec(object): __slots__ = ("name", "type", "duration", "after_call", "after_write") def __init__(self, name: str, type: Type, expire_after_call: timedelta = None, expire_after_write: timedelta = None): """ ValueSpec a specification of a persisted value. :param name: a user defined state name, used to represent this value. A name must be lower case, alphanumeric string without spaces, that starts with either a letter or an underscore (_) :param type: the value's Type. (see statefun.Type) :param expire_after_call: expire (remove) this value if a call to this function hasn't been made for the given duration. :param expire_after_write: expire this value if it wasn't written to for the given duration. """ if not name: raise ValueError("name can not be missing.") if not name.isidentifier(): raise ValueError( f"invalid name {name}. A spec name can only contains alphanumeric letters (a-z) and (0-9), " f"or underscores ( " f"_). A valid identifier cannot start with a number, or contain any spaces.") if iskeyword(name): forbidden = '\n'.join(kwlist) raise ValueError( f"invalid spec name {name} (Python SDK specifically). since {name} will result as an attribute on " f"context.store.\n" f"The following names are forbidden:\n {forbidden}") if not name.islower(): raise ValueError(f"Only lower case names are allowed, {name} is given.") self.name = name if not type: raise ValueError("type can not be missing.") if not isinstance(type, Type): raise TypeError("type is not a StateFun type.") self.type = type if expire_after_call and expire_after_write: # both can not be set. raise ValueError("Either expire_after_call or expire_after_write can be set, but not both.") if expire_after_call: self.duration = int(expire_after_call.total_seconds() * 1000.0) self.after_call = True self.after_write = False elif expire_after_write: self.duration = int(expire_after_write.total_seconds() * 1000.0) self.after_call = False self.after_write = True else: self.duration = 0 self.after_call = False self.after_write = False def parse_typename(typename): """ Parse a TypeName string into a namespace, type pair. :param typename: a string of the form <namespace>/<type> :return: a tuple of a namespace type. """ if typename is None: raise ValueError("function type must be provided") idx = typename.rfind("/") if idx < 0: raise ValueError("function type must be of the from namespace/name") namespace = typename[:idx] if not namespace: raise ValueError("function type's namespace must not be empty") type = typename[idx + 1:] if not type: raise ValueError("function type's name must not be empty") return namespace, type def simple_type(typename=None, serialize_fn=None, deserialize_fn=None) -> Type: """ Create a user defined Type, simply by providing two functions. One for serializing a value to bytes, and one for deserializing the value from bytes. For example: import json tpe = simple_type("org.foo.bar/User", serialize_fn=json.dumps, deserialize_fn=json.loads) this defines a StateFun type (Type) of the typename org.foo.bar/User and it is a JSON object. :param typename: this type's TypeName. :param serialize_fn: a function that is able to serialize values of this type. :param deserialize_fn: a function that is able to deserialize bytes into values of this type. :return: a Type definition. """ return SimpleType(typename, serialize_fn, deserialize_fn) # ---------------------------------------------------------------------------------------------------------- # Internal # ---------------------------------------------------------------------------------------------------------- class SimpleType(Type): __slots__ = ("typename", "_ser") def __init__(self, typename, serialize_fn, deserialize_fn): super().__init__(typename) if not serialize_fn: raise ValueError("serialize_fn is missing") if not deserialize_fn: raise ValueError("deserialize_fn is missing") self._ser = UserTypeSerializer(serialize_fn, deserialize_fn) def serializer(self) -> TypeSerializer: return self._ser class UserTypeSerializer(TypeSerializer): __slots__ = ("serialize_fn", "deserialize_fn") def __init__(self, serialize_fn, deserialize_fn): self.serialize_fn = serialize_fn self.deserialize_fn = deserialize_fn def serialize(self, value): return self.serialize_fn(value) def deserialize(self, byte_string): return self.deserialize_fn(byte_string)