in processor.py [0:0]
def _is_part_processor_protocol(obj: Any) -> bool:
"""Returns True if `obj` implements PartProcessorFn.
This function is needed as Processors and PartProcessors are Protocols and do
not have proper runtime type checking.
Args:
obj: any object or function
"""
def _full_name(obj: Any) -> str:
"""Returns the full qualified name of the object `obj`."""
return obj.__module__ + '.' + getattr(obj, '__qualname__', '')
if not callable(obj):
return False
# Extract callable argument hints
if isinstance(obj, types.FunctionType):
type_hint = typing.get_type_hints(obj)
else:
type_hint = typing.get_type_hints(obj.__call__) # pylint: disable=attribute-error
# Return type needs to be defined
if 'return' not in type_hint:
return False
return_type = type_hint.pop('return')
# Only one input parameter is defined
if len(type_hint) != 1:
return False
# Only one generic type in the output type
if len(typing.get_args(return_type)) != 1:
return False
# class names on output types must match.
# We only test on AsyncIterable __qualname__ as AsyncIterable can be declared
# in typing or collections.abc, and both should be recognized.
if return_type.__qualname__ != 'AsyncIterable' or _full_name(
typing.get_args(return_type)[0]
) != _full_name(ProcessorPart):
return False
# Type hints contains the input type only.
if _full_name(next(iter(type_hint.values()))) != _full_name(ProcessorPart):
return False
return True