azurefunctions-extensions-base/azurefunctions/extensions/base/utils.py (140 lines of code) (raw):
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import inspect
import json
import re
from abc import ABC
from enum import Enum
from typing import Any, Callable, Optional
from . import meta
SNAKE_CASE_RE = re.compile(r"^([a-zA-Z]+\d*_|_+[a-zA-Z\d])\w*$")
WORD_RE = re.compile(r"^([a-zA-Z]+\d*)$")
class StringifyEnum(Enum):
"""This class output name of enum object when printed as string."""
def __str__(self):
return str(self.name)
class StringifyEnumJsonEncoder(json.JSONEncoder):
def default(self, o):
return str(o)
class BuildDictMeta(type):
def __new__(mcs, name, bases, dct):
"""BuildDictMeta will apply to every binding.
It will apply :meth:`add_to_dict` decorator to :meth:`__init__` of
every binding class to collect list of params to include in building
json dictionary which corresponds to function.json in legacy app.
It will also apply :meth:`skip_none` to :meth:`get_dict_repr` to
enable json dictionary generated for every binding has non-empty
value fields. It is needed for enabling binding param optionality.
"""
cls = super().__new__(mcs, name, bases, dct)
setattr(cls, "__init__", cls.add_to_dict(getattr(cls, "__init__")))
setattr(cls, "get_dict_repr", cls.skip_none(getattr(cls, "get_dict_repr")))
return cls
@staticmethod
def skip_none(func):
def wrapper(*args, **kw):
res = func(*args, **kw)
return BuildDictMeta.clean_nones(res)
return wrapper
@staticmethod
def add_to_dict(func: Callable[..., Any]):
def wrapper(*args, **kwargs):
if args is None or len(args) == 0:
raise ValueError(
f"{func.__name__} has no args. Please ensure func is an "
f"object method."
)
func(*args, **kwargs)
self = args[0]
init_params = list(inspect.signature(func).parameters.keys())
init_params.extend(list(kwargs.keys()))
for key in kwargs.keys():
if not hasattr(self, key):
setattr(self, key, kwargs[key])
setattr(self, "init_params", init_params)
return wrapper
@staticmethod
def clean_nones(value):
"""
Recursively remove all None values from dictionaries and lists,
and returns
the result as a new dictionary or list.
"""
if isinstance(value, list):
return [BuildDictMeta.clean_nones(x) for x in value if x is not None]
elif isinstance(value, dict):
return {
key: BuildDictMeta.clean_nones(val)
for key, val in value.items()
if val is not None
}
else:
return value
# Enums
class BindingDirection(StringifyEnum):
"""Direction of the binding used in function.json"""
IN = 0
"""Input binding direction."""
OUT = 1
"""Output binding direction."""
INOUT = 2
"""Some bindings support a special binding direction. """
class DataType(StringifyEnum):
"""Data type of the binding used in function.json"""
"""Parse binding argument as undefined."""
UNDEFINED = 0
"""Parse binding argument as string."""
STRING = 1
"""Parse binding argument as binary."""
BINARY = 2
"""Parse binding argument as stream."""
STREAM = 3
class Binding(ABC):
"""Abstract binding class which captures common attributes and
functions. :meth:`get_dict_repr` can auto generate the function.json for
every binding, the only restriction is ***ENSURE*** __init__ parameter
names of any binding class are snake case form of corresponding
attribute in function.json when new binding classes are created.
Ref: https://aka.ms/azure-function-binding-http"""
EXCLUDED_INIT_PARAMS = {"self", "kwargs", "type", "data_type", "direction"}
@staticmethod
def get_binding_name() -> str:
pass
def __init__(
self,
name: str,
direction: BindingDirection,
data_type: Optional[DataType] = None,
type: Optional[str] = None,
): # NoQa
# For natively supported bindings, get_binding_name is always
# implemented, and for generic bindings, type is a required argument
# in decorator functions.
self.type = (
self.get_binding_name() if self.get_binding_name() is not None else type
)
self.name = name
self._direction = direction
self._data_type = data_type
self._dict = {
"direction": self._direction,
"dataType": self._data_type,
"type": self.type,
}
@property
def data_type(self) -> Optional[int]:
return self._data_type.value if self._data_type else None
@property
def direction(self) -> int:
return self._direction.value
def get_dict_repr(binding, input_types):
"""Build a dictionary of a particular binding. The keys are camel
cased binding field names defined in `init_params` list and
:class:`Binding` class. \n
This method is invoked in function :meth:`get_raw_bindings` of class
:class:`Function` to generate json dict for each binding.
:return: Dictionary representation of the binding. Dict representation
of the binding in the format:
((binding type, pytype), deferred bindings enabled)
"""
params = list(dict.fromkeys(getattr(binding, "init_params", [])))
binding_info = {}
for p in params:
if p not in Binding.EXCLUDED_INIT_PARAMS:
binding._dict[to_camel_case(p)] = getattr(binding, p, None)
if input_types.get(binding.name) is not None:
pytype = input_types.get(binding.name).pytype
else:
pytype = None
# Adding flag to signal to the host to send MBD object
# 1. check if the binding is a supported type (blob, blobTrigger)
# 2. check if the binding is an input binding
# 3. check if the defined type is an SdkType
if (
binding.type in meta._ConverterMeta._bindings
and binding.direction == 0
and meta._ConverterMeta.check_supported_type(pytype)
):
binding._dict["properties"] = {"SupportsDeferredBinding": True}
binding_info = {binding.name: {pytype: "True"}}
# if it isn't, we set the flag to false
else:
binding._dict["properties"] = {"SupportsDeferredBinding": False}
binding_info = {binding.name: {pytype: "False"}}
return binding._dict, binding_info
def to_camel_case(snake_case_str: str):
if snake_case_str is None or len(snake_case_str) == 0:
raise ValueError(f"Please ensure arg name {snake_case_str} is not empty!")
if not is_snake_case(snake_case_str) and not is_word(snake_case_str):
raise ValueError(
f"Please ensure {snake_case_str} is a word or snake case "
f"string with underscore as separator."
)
words = snake_case_str.split("_")
return words[0] + "".join([ele.title() for ele in words[1:]])
def is_snake_case(input_string: str) -> bool:
"""
Checks if a string is formatted as "snake case".
A string is considered snake case when:
- it's composed only by lowercase/uppercase letters and digits
- it contains at least one underscore
- it does not start with a number
*Examples:*
>>> is_snake_case('foo_bar_baz') # returns true
>>> is_snake_case('foo') # returns false
:param input_string: String to test.
:return: True for a snake case string, false otherwise.
"""
return SNAKE_CASE_RE.match(input_string) is not None
def is_word(input_string: str) -> bool:
"""
Checks if a string is one word.
A string is considered one word when:
- it's composed only by lowercase/uppercase letters and digits
- it does not start with a number
*Examples:*
>>> is_word('1foo') # returns false
>>> is_word('foo_') # returns false
>>> is_word('foo') # returns true
:param input_string: String to test.
:return: True for one word string, false otherwise.
"""
return WORD_RE.match(input_string) is not None
def get_raw_bindings(indexed_function, input_types):
binding_dict_repr = []
bindings_logs = {}
for b in indexed_function._bindings:
dict_repr, logs = Binding.get_dict_repr(b, input_types)
clean_dict_repr = BuildDictMeta.clean_nones(dict_repr)
binding_dict_repr.append(
json.dumps(clean_dict_repr, cls=StringifyEnumJsonEncoder)
)
bindings_logs.update(logs)
return binding_dict_repr, bindings_logs