def is_compatible()

in tfx/utils/pure_typing_utils.py [0:0]


def is_compatible(value: Any, tp: Type[_T]) -> TypeGuard[_T]:
  """Whether the value is compatible with the type.

  Similar to builtin.isinstance(), but accepts more advanced subscripted type
  hints.

  Args:
    value: The value under test.
    tp: The type to check acceptability.

  Returns:
    Whether the `value` is compatible with the type `tp`.
  """
  maybe_origin = typing.get_origin(tp)
  maybe_args = typing.get_args(tp)
  if tp is Any:
    return True
  if tp in (None, type(None)):
    return value is None
  if isinstance(tp, enum_type_wrapper.EnumTypeWrapper):
    return value in tp.values()
  if inspect.isclass(tp):
    if not maybe_args:
      if issubclass(tp, dict) and hasattr(tp, '__annotations__'):
        return _is_typed_dict_compatible(value, tp)
      return isinstance(value, tp)
  if maybe_origin is not None:
    # Union[T, U] or T | U
    if maybe_origin in _UNION_ORIGINS:
      assert maybe_args, f'{tp} should be subscripted.'
      return any(is_compatible(value, arg) for arg in maybe_args)
    # Type[T]
    elif maybe_origin is type:
      if not maybe_args:
        return inspect.isclass(value)
      assert len(maybe_args) == 1
      subtype = maybe_args[0]
      if subtype is Any:
        return inspect.isclass(value)
      elif typing.get_origin(subtype) is typing.Union:
        # Convert Type[Union[x, y, ...]] to Union[Type[x], Type[y], ...].
        subtypes = [typing.Type[a] for a in typing.get_args(subtype)]
        return any(is_compatible(value, t) for t in subtypes)
      elif inspect.isclass(subtype):
        return inspect.isclass(value) and issubclass(value, subtype)
    # List[T], Set[T], FrozenSet[T], Iterable[T], Sequence[T], MutableSeuence[T]
    elif maybe_origin in (
        list,
        set,
        frozenset,
        collections.abc.Iterable,
        collections.abc.Sequence,
        collections.abc.MutableSequence,
        collections.abc.Collection,
        collections.abc.Container,
    ):
      if not isinstance(value, maybe_origin):
        return False
      if not maybe_args:
        return True
      assert len(maybe_args) == 1
      if maybe_args[0] is str and isinstance(value, str):
        # `str` is technically Iterable[str], etc., but it's mostly not intended
        # for type checking and fail to catch a bug. Therefore we don't regard
        # str as Iterable[str], etc. This is also consistent with pytype
        # behavior:
        # https://github.com/google/pytype/blob/main/docs/faq.md#why-doesnt-str-match-against-string-iterables
        return False
      return all(is_compatible(v, maybe_args[0]) for v in value)
    # Tuple[T]
    elif maybe_origin is tuple:
      if not isinstance(value, tuple):
        return False
      if not maybe_args:
        return True
      if len(maybe_args) == 2 and maybe_args[-1] is Ellipsis:
        return all(is_compatible(v, maybe_args[0]) for v in value)
      return len(maybe_args) == len(value) and all(
          is_compatible(v, arg) for v, arg in zip(value, maybe_args)
      )
    # Dict[K, V], Mapping[K, V], MutableMapping[K, V]
    elif maybe_origin in (
        dict,
        collections.abc.Mapping,
        collections.abc.MutableMapping,
    ):
      if not isinstance(value, maybe_origin):
        return False
      if not maybe_args:  # Unsubscripted Dict.
        return True
      assert len(maybe_args) == 2
      kt, vt = maybe_args
      return all(
          is_compatible(k, kt) and is_compatible(v, vt)
          for k, v in value.items()
      )
    # Literal[T]
    elif maybe_origin is Literal:
      assert maybe_args
      return value in maybe_args
    elif maybe_origin is Annotated:
      assert maybe_args
      return is_compatible(value, maybe_args[0])
    # Required[T] and NotRequired[T]
    elif maybe_origin in (Required, NotRequired):
      assert len(maybe_args) == 1
      return is_compatible(value, maybe_args[0])
    else:
      raise NotImplementedError(
          f'Type {tp} with unsupported origin type {maybe_origin}.'
      )
  raise NotImplementedError(f'Unsupported type {tp}.')