Lib/__static__/__init__.py (411 lines of code) (raw):
# pyre-strict
from __future__ import annotations
import array
import functools
import random
import time
from asyncio import iscoroutinefunction
from types import FunctionType, Union as typesUnion
from typing import (
_GenericAlias,
Dict,
Iterable,
Literal,
Optional,
Type,
TypeVar,
Tuple,
Union,
final,
_tp_cache,
)
from weakref import WeakValueDictionary
from .enum import Enum, Int64Enum, IntEnum, StringEnum # noqa: F401
from .type_code import (
type_code,
TYPED_INT8,
TYPED_INT16,
TYPED_INT32,
TYPED_INT64,
TYPED_UINT8,
TYPED_UINT16,
TYPED_UINT32,
TYPED_UINT64,
TYPED_DOUBLE,
TYPED_SINGLE,
TYPED_BOOL,
TYPED_CHAR,
)
try:
import _static
except ImportError:
def is_type_static(_t):
return False
def set_type_static(_t):
return _t
def set_type_static_final(_t):
return _t
def set_type_final(_t):
return _t
_static = None
chkdict = dict
chklist = list
def make_recreate_cm(_typ):
def _recreate_cm(self):
return self
return _recreate_cm
else:
chkdict = _static.chkdict
chklist = _static.chklist
is_type_static = _static.is_type_static
set_type_static = _static.set_type_static
set_type_static_final = _static.set_type_static_final
set_type_final = _static.set_type_final
make_recreate_cm = _static.make_recreate_cm
try:
from _static import (
TYPED_INT8,
TYPED_INT16,
TYPED_INT32,
TYPED_INT64,
TYPED_UINT8,
TYPED_UINT16,
TYPED_UINT32,
TYPED_UINT64,
TYPED_DOUBLE,
TYPED_SINGLE,
TYPED_BOOL,
TYPED_CHAR,
RAND_MAX,
rand,
posix_clock_gettime_ns,
__build_cinder_class__,
)
except ImportError:
RAND_MAX = (1 << 31) - 1
def rand():
return random.randint(0, RAND_MAX)
def posix_clock_gettime_ns():
return time.clock_gettime_ns(time.CLOCK_MONOTONIC)
def create_overridden_slot_descriptors_with_default(t):
return None
__build_cinder_class__ = __build_class__
try:
import cinder
except ImportError:
cinder = None
pydict = dict
PyDict = Dict
clen = len
@set_type_final
@type_code(TYPED_UINT64)
class size_t(int):
pass
@set_type_final
@type_code(TYPED_INT64)
class ssize_t(int):
pass
@set_type_final
@type_code(TYPED_INT8)
class int8(int):
pass
byte = int8
@set_type_final
@type_code(TYPED_INT16)
class int16(int):
pass
@set_type_final
@type_code(TYPED_INT32)
class int32(int):
pass
@set_type_final
@type_code(TYPED_INT64)
class int64(int):
pass
@set_type_final
@type_code(TYPED_UINT8)
class uint8(int):
pass
@set_type_final
@type_code(TYPED_UINT16)
class uint16(int):
pass
@set_type_final
@type_code(TYPED_UINT32)
class uint32(int):
pass
@set_type_final
@type_code(TYPED_UINT64)
class uint64(int):
pass
@set_type_final
@type_code(TYPED_SINGLE)
class single(float):
pass
@set_type_final
@type_code(TYPED_DOUBLE)
class double(float):
pass
@set_type_final
@type_code(TYPED_CHAR)
class char(int):
pass
@set_type_final
@type_code(TYPED_BOOL)
class cbool(int):
pass
ArrayElement = TypeVar(
"ArrayElement",
int8,
int16,
int32,
int64,
uint8,
uint16,
uint32,
uint64,
char,
float,
double,
)
_TYPE_SIZES = {tc: array.array(tc).itemsize for tc in array.typecodes}
# These should be in sync with the array module
_TYPE_CODES = {
int8: "b",
uint8: "B",
int16: "h",
uint16: "H",
# apparently, l is equivalent to q for us, but that may not be true everywhere.
int32: "i" if _TYPE_SIZES["i"] == 4 else "l",
uint32: "I" if _TYPE_SIZES["I"] == 4 else "L",
int64: "q",
uint64: "Q",
float: "f",
double: "d",
char: "B",
}
TVarOrType = Union[TypeVar, Type[object]]
def _subs_tvars(
tp: Tuple[TVarOrType, ...],
tvars: Tuple[TVarOrType, ...],
subs: Tuple[TVarOrType, ...],
) -> Type[object]:
"""Substitute type variables 'tvars' with substitutions 'subs'.
These two must have the same length.
"""
if not hasattr(tp, "__args__"):
return tp
new_args = list(tp.__args__)
for a, arg in enumerate(tp.__args__):
if isinstance(arg, TypeVar):
for i, tvar in enumerate(tvars):
if arg == tvar:
if (
tvar.__constraints__
and not isinstance(subs[i], TypeVar)
and not issubclass(subs[i], tvar.__constraints__)
):
raise TypeError(
f"Invalid type for {tvar.__name__}: {subs[i].__name__} when instantiating {tp.__name__}"
)
new_args[a] = subs[i]
else:
new_args[a] = _subs_tvars(arg, tvars, subs)
return _replace_types(tp, tuple(new_args))
def _collect_type_vars(types: Tuple[TVarOrType, ...]) -> Tuple[TypeVar, ...]:
"""Collect all type variable contained in types in order of
first appearance (lexicographic order). For example::
_collect_type_vars((T, List[S, T])) == (T, S)
"""
tvars = []
for t in types:
if isinstance(t, TypeVar) and t not in tvars:
tvars.append(t)
if hasattr(t, "__parameters__"):
tvars.extend([t for t in t.__parameters__ if t not in tvars])
return tuple(tvars)
def make_generic_type(
gen_type: Type[object], params: Tuple[Type[object], ...]
) -> Type[object]:
if len(params) != len(gen_type.__parameters__):
raise TypeError(f"Incorrect number of type arguments for {gen_type.__name__}")
# Substitute params into __args__ replacing instances of __parameters__
return _subs_tvars(
gen_type,
gen_type.__parameters__,
params,
)
def _replace_types(
gen_type: Type[object], subs: Tuple[Type[object], ...]
) -> Type[object]:
existing_inst = gen_type.__origin__.__insts__.get(subs)
if existing_inst is not None:
return existing_inst
# Check if we have a full instantation, and verify the constraints
new_dict = dict(gen_type.__dict__)
has_params = False
for sub in subs:
if isinstance(sub, TypeVar) or hasattr(sub, "__parameters__"):
has_params = True
continue
# Remove the existing StaticGeneric base...
bases = tuple(
base for base in gen_type.__orig_bases__ if not isinstance(base, StaticGeneric)
)
new_dict["__args__"] = subs
if not has_params:
# Instantiated types don't have generic parameters anymore.
del new_dict["__parameters__"]
else:
new_vars = _collect_type_vars(subs)
new_gen = StaticGeneric()
new_gen.__parameters__ = new_vars
new_dict["__orig_bases__"] = bases + (new_gen,)
bases += (StaticGeneric,)
new_dict["__parameters__"] = new_vars
# Eventually we'll want to have some processing of the members here to
# bind the generics through. That may be an actual process which creates
# new objects with the generics bound, or a virtual process. For now
# we just propagate the members to the new type.
param_names = ", ".join(param.__name__ for param in subs)
res = type(f"{gen_type.__origin__.__name__}[{param_names}]", bases, new_dict)
res.__origin__ = gen_type
if not has_params:
# specialize the type
for name, value in new_dict.items():
if isinstance(value, FunctionType):
if hasattr(value, "__runtime_impl__"):
setattr(
res,
name,
_static.specialize_function(res, value.__qualname__, subs),
)
if cinder is not None:
cinder.freeze_type(res)
gen_type.__origin__.__insts__[subs] = res
return res
def _runtime_impl(f):
"""marks a generic function as being runtime-implemented"""
f.__runtime_impl__ = True
return f
class StaticGeneric:
"""Base type used to mark static-Generic classes. Instantations of these
classes share different generic types and the generic type arguments can
be accessed via __args___"""
@_tp_cache
def __class_getitem__(
cls, elem_type: Tuple[Union[TypeVar, Type[object]]]
) -> Union[StaticGeneric, Type[object]]:
if not isinstance(elem_type, tuple):
# we specifically recurse to hit the type cache
return cls[
elem_type,
]
if cls is StaticGeneric:
res = StaticGeneric()
res.__parameters__ = elem_type
return res
return set_type_static_final(make_generic_type(cls, elem_type))
def __init_subclass__(cls) -> None:
type_vars = _collect_type_vars(cls.__orig_bases__)
cls.__origin__ = cls
cls.__parameters__ = type_vars
if not hasattr(cls, "__args__"):
cls.__args__ = type_vars
cls.__insts__ = WeakValueDictionary()
def __mro_entries__(self, bases) -> Tuple[Type[object, ...]]:
return (StaticGeneric,)
def __repr__(self) -> str:
return (
"<StaticGeneric: "
+ ", ".join([param.__name__ for param in self.__parameters__])
+ ">"
)
@set_type_final
class Array(array.array, StaticGeneric[ArrayElement]):
__slots__ = ()
def __new__(cls, initializer: int | Iterable[ArrayElement]):
if hasattr(cls, "__parameters__"):
raise TypeError("Cannot create plain Array")
typecode = _TYPE_CODES[cls.__args__[0]]
if isinstance(initializer, int):
res = array.array.__new__(cls, typecode, [0])
res *= initializer
return res
else:
return array.array.__new__(cls, typecode, initializer)
def __getitem__(self, index):
if isinstance(index, slice):
return type(self)(array.array.__getitem__(self, index))
return array.array.__getitem__(self, index)
def __deepcopy__(self, memo):
return type(self)(self)
@set_type_final
class Vector(array.array, StaticGeneric[ArrayElement]):
"""Vector is a resizable array of primitive elements"""
__slots__ = ()
def __new__(cls, initializer: int | Iterable[ArrayElement] | None = None):
if hasattr(cls, "__parameters__"):
raise TypeError("Cannot create plain Vector")
typecode = _TYPE_CODES[cls.__args__[0]]
if isinstance(initializer, int):
# specifing size
res = array.array.__new__(cls, typecode, [0])
res *= initializer
return res
elif initializer is not None:
return array.array.__new__(cls, typecode, initializer)
else:
return array.array.__new__(cls, typecode)
if _static is not None:
@_runtime_impl
def append(self, value: ArrayElement) -> None:
super().append(value)
def __getitem__(self, index):
if isinstance(index, slice):
return type(self)(array.array.__getitem__(self, index))
return array.array.__getitem__(self, index)
def __deepcopy__(self, memo):
return type(self)(self)
def box(o):
return o
def unbox(o):
return o
def allow_weakrefs(klass):
return klass
def dynamic_return(func):
return func
def inline(func):
return func
def _donotcompile(func):
return func
def cast(typ, val):
union_args = None
if type(typ) is _GenericAlias:
typ, args = typ.__origin__, typ.__args__
if typ is Union:
union_args = args
elif type(typ) is typesUnion:
union_args = typ.__args__
if union_args:
typ = None
if len(union_args) == 2:
if union_args[0] is type(None):
typ = union_args[1]
elif union_args[1] is type(None):
typ = union_args[0]
if typ is None:
raise ValueError("cast expects type or Optional[T]")
if val is None:
return None
inst_type = type(val)
if typ not in inst_type.__mro__:
raise TypeError(f"expected {typ.__name__}, got {type(val).__name__}")
return val
def prod_assert(value: bool, message: Optional[str] = None):
if not value:
raise AssertionError(message) if message else AssertionError()
CheckedDict = chkdict
CheckedList = chklist
async def _return_none():
"""This exists solely as a helper for the ContextDecorator C implementation"""
pass
class ExcContextDecorator:
def __enter__(self) -> None:
return self
def __exit__(self, exc_type: object, exc_value: object, traceback: object) -> bool:
return False
@final
def __call__(self, func):
if not iscoroutinefunction(func):
@functools.wraps(func)
def _no_profile_inner(*args, **kwds):
with self._recreate_cm():
return func(*args, **kwds)
else:
@functools.wraps(func)
async def _no_profile_inner(*args, **kwds):
with self._recreate_cm():
return await func(*args, **kwds)
# This will replace the vector call entry point with a C implementation.
# We still want to return a function object because various things check
# for functions or if an object is a co-routine.
return _static.make_context_decorator_wrapper(self, _no_profile_inner, func)
ExcContextDecorator._recreate_cm = make_recreate_cm(ExcContextDecorator)
set_type_static(ExcContextDecorator)
class ContextDecorator(ExcContextDecorator):
"""A ContextDecorator which cannot suppress exceptions."""
def __exit__(
self, exc_type: object, exc_value: object, traceback: object
) -> Literal[False]:
return False
set_type_static(ContextDecorator)