in src/aws_encryption_sdk/streaming_client.py [0:0]
def __new__(cls, **kwargs):
"""Perform necessary handling for _EncryptionStream instances that should be
applied to all children.
"""
# Patch for abstractmethod-like enforcement in io.IOBase grandchildren.
if (
not (hasattr(cls, "_read_bytes") and callable(cls._read_bytes))
or not (hasattr(cls, "_prep_message") and callable(cls._read_bytes))
or not hasattr(cls, "_config_class")
):
raise TypeError("Can't instantiate abstract class {}".format(cls.__name__))
instance = super(_EncryptionStream, cls).__new__(cls)
config = kwargs.pop("config", None)
if not isinstance(config, instance._config_class): # pylint: disable=protected-access
config = instance._config_class(**kwargs) # pylint: disable=protected-access
instance.config = config
instance.bytes_read = 0
instance.output_buffer = b""
instance._message_prepped = False # pylint: disable=protected-access
instance.source_stream = instance.config.source
instance._stream_length = instance.config.source_length # pylint: disable=protected-access
return instance