statefun-sdk-python/statefun/messages.py (106 lines of code) (raw):

################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ import typing import statefun.wrapper_types as wrapper_types from statefun.core import Type from statefun.request_reply_pb2 import TypedValue from statefun.utils import to_typed_value class Message(object): __slots__ = ("target_typename", "target_id", "typed_value") def __init__(self, target_typename: str, target_id: str, typed_value: TypedValue): """ A Stateful Functions Message. :param target_typename: The TypeName represented as a string of the form <namespace>/<name> of the target function. :param target_id: The id of the target function :param typed_value: The internal protobuf representation of the typed_value. """ if not target_typename: raise ValueError("target_typename can not be missing") if not target_id: raise ValueError("target_id can not be missing") if not typed_value: raise ValueError("typed_value can not be missing") self.target_typename = target_typename self.target_id = target_id self.typed_value = typed_value def is_int(self): return self.is_type(wrapper_types.IntType) def as_int(self): return self.as_type(wrapper_types.IntType) def is_bool(self): return self.is_type(wrapper_types.BoolType) def as_bool(self) -> typing.Optional[bool]: return self.as_type(wrapper_types.BoolType) def is_long(self): return self.is_type(wrapper_types.LongType) def as_long(self) -> typing.Optional[int]: return self.as_type(wrapper_types.LongType) def is_string(self): return self.is_type(wrapper_types.StringType) def as_string(self) -> typing.Optional[str]: return self.as_type(wrapper_types.StringType) def is_float(self): return self.is_type(wrapper_types.FloatType) def as_float(self) -> typing.Optional[float]: return self.as_type(wrapper_types.FloatType) def is_double(self): return self.is_type(wrapper_types.DoubleType) def as_double(self) -> typing.Optional[float]: return self.as_type(wrapper_types.DoubleType) def is_type(self, tpe: Type) -> bool: return self.typed_value.typename == tpe.typename def value_typename(self) -> str: return self.typed_value.typename def raw_value(self) -> typing.Optional[bytes]: tv = self.typed_value return tv.value if tv.has_value else None def as_type(self, tpe: Type): tv = self.typed_value if tv.has_value: serializer = tpe.serializer() return serializer.deserialize(tv.value) else: return None class EgressMessage(object): __slots__ = ("typename", "typed_value") def __init__(self, typename: str, typed_value: TypedValue): if not typename: raise ValueError("typename is missing") if not typed_value: raise ValueError("value is missing") self.typename = typename self.typed_value = typed_value def value_typename(self) -> str: return self.typed_value.typename def raw_value(self) -> typing.Optional[bytes]: tv = self.typed_value if tv.has_value: return tv.value else: return None PRIMITIVE_GETTERS = {"int_value": wrapper_types.IntType, "float_value": wrapper_types.FloatType, "long_value": wrapper_types.LongType, "str_value": wrapper_types.StringType, "double_value": wrapper_types.DoubleType, "bool_value": wrapper_types.BoolType} def message_builder(target_typename: str, target_id: str, **kwargs) -> Message: """ Build a Message that can be sent to any other function. :param target_typename: The TypeName represented as a string of the form <namespace>/<name> of the target function. :param target_id: The id of the target function :param kwargs: This specify the value type to attach to this message. The following arguments are supported: int_value=<an int>, float_value=<a float> long_value=<a signed 64 bit integer> str_value=<str> double_value=<double> bool_value=<bool> ... value=<arbitrary value>, value_type=<a StateFun Type for this value> :return: A Message object, that can be sent. """ if len(kwargs) == 2: value, value_type = kwargs["value"], kwargs["value_type"] elif len(kwargs) == 1: # expecting: <type>_value : value # for example one of the following: # int_value=1 # str_value="hello world" # long_value= 5511 type_keyword, value = next(iter(kwargs.items())) value_type = PRIMITIVE_GETTERS.get(type_keyword) else: raise TypeError(f"Wrong number of value keywords given: {kwargs}, there must be exactly one of:" f"\nint_value=.." f"\nfloat_value.." f"\netc'" f"\nor:" f"\nvalue=.. ,value_type=.. ") if value is None: raise ValueError("value can not be missing") if not value_type: raise ValueError( "Could not deduce the value type, please specify the type explicitly. via passing: value=<the value>, " "value_type=<the type>") typed_value = to_typed_value(type=value_type, value=value) return Message(target_typename=target_typename, target_id=target_id, typed_value=typed_value) def egress_message_builder(target_typename: str, value: typing.Any, value_type: Type): """ Create a generic egress record. To use Kafka specific egress please use kafka_egress_message(), and for Kinesis please use kinesis_egress_message(). """ if not target_typename: raise ValueError("target typename is missing") if value is None: raise ValueError("value can not be missing") typed_value = to_typed_value(type=value_type, value=value) return EgressMessage(typename=target_typename, typed_value=typed_value)