library/_codecs.py (691 lines of code) (raw):
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates. (http://www.facebook.com)
from builtins import _index, _str_array
from _builtins import (
_builtin,
_bytes_check,
_bytes_decode,
_bytes_decode_ascii,
_bytes_decode_utf_8,
_bytes_len,
_byteslike_guard,
_int_check,
_object_type_hasattr,
_str_array_iadd,
_str_check,
_str_encode,
_str_encode_ascii,
_str_guard,
_str_len,
_tuple_check,
_tuple_len,
_type,
_Unbound,
_unimplemented,
maxunicode as _maxunicode,
)
codec_search_path = []
codec_search_cache = {}
def register(search_func):
if not callable(search_func):
raise TypeError("argument must be callable")
codec_search_path.append(search_func)
def lookup(encoding):
cached = codec_search_cache.get(encoding)
if cached is not None:
return cached
# Make sure that we loaded the standard codecs.
if not codec_search_path:
import encodings # noqa: F401
normalized_encoding = encoding.lower().replace(" ", "-")
result = None
for search_func in codec_search_path:
result = search_func(normalized_encoding)
if result is None:
continue
if not _tuple_check(result) or _tuple_len(result) != 4:
raise TypeError("codec search functions must return 4-tuples")
break
if result is None:
raise LookupError(f"unknown encoding: {encoding}")
codec_search_cache[encoding] = result
return result
def _lookup_text(encoding, alternate_command):
codec = lookup(encoding)
if _type(codec) != tuple:
try:
if not codec._is_text_encoding:
raise LookupError(
f"{encoding} is not a text encoding; "
f"use {alternate_command} to handle arbitrary codecs"
)
except AttributeError:
pass
return codec
def decode(data, encoding: str = "utf-8", errors: str = _Unbound) -> str:
result = _bytes_decode(data, encoding)
if result is not _Unbound:
return result
try:
return _codec_decode_table[encoding.lower()](
data, "strict" if errors is _Unbound else errors
)[0]
except KeyError:
try:
decoder = lookup(encoding)[1]
except LookupError:
raise LookupError(f"unknown encoding: {encoding}")
if errors is _Unbound:
result = decoder(data)
else:
result = decoder(data, errors)
if _tuple_check(result) and _tuple_len(result) == 2:
return result[0]
# CPython does not check to make sure that the second element is an int
raise TypeError("decoder must return a tuple (object,integer)")
def encode(data, encoding: str = "utf-8", errors: str = _Unbound) -> bytes:
result = _str_encode(data, encoding)
if result is not _Unbound:
return result
try:
return _codec_encode_table[encoding.lower()](
data, "strict" if errors is _Unbound else errors
)[0]
except KeyError:
try:
encoder = lookup(encoding)[0]
except LookupError:
raise LookupError(f"unknown encoding: {encoding}")
if errors is _Unbound:
result = encoder(data)
else:
result = encoder(data, errors)
if _tuple_check(result) and _tuple_len(result) == 2:
return result[0]
# CPython does not check to make sure that the second element is an int
raise TypeError("encoder must return a tuple (object, integer)")
def _ascii_decode(data: str, errors: str, index: int, out: _str_array):
_builtin()
def ascii_decode(data: bytes, errors: str = "strict"):
_byteslike_guard(data)
if not _str_check(errors):
raise TypeError(
"ascii_decode() argument 2 must be str or None, not "
f"'{_type(errors).__name__}'"
)
result = _bytes_decode_ascii(data)
if result is not _Unbound:
return result, _bytes_len(data)
result = _str_array()
i = 0
encoded = ""
length = len(data)
while i < length:
encoded, i = _ascii_decode(data, errors, i, result)
if _int_check(encoded):
data, i = _call_decode_errorhandler(
errors, data, result, "ordinal not in range(128)", "ascii", encoded, i
)
if _str_check(encoded):
return encoded, i
# The error handler was the last to write to the result
return str(result), i
def _ascii_encode(data: str, errors: str, index: int, out: bytearray):
"""Tries to encode `data`, starting from `index`, into the `out` bytearray.
If it encounters any codepoints above 127, it tries using the `errors`
error handler to fix it internally, but returns the a tuple of the first
and last index of the error on failure.
If it finishes encoding, it returns a tuple of the final bytes and length.
"""
_builtin()
def ascii_encode(data: str, errors: str = "strict"):
if not _str_check(data):
raise TypeError(
f"ascii_encode() argument 1 must be str, not {_type(data).__name__}"
)
if not _str_check(errors):
raise TypeError(
"ascii_encode() argument 2 must be str or None, not "
f"{_type(errors).__name__}"
)
result = _str_encode_ascii(data)
if result is not _Unbound:
return result, _str_len(data)
result = bytearray()
i = 0
encoded = b""
length = _str_len(data)
while i < length:
encoded, i = _ascii_encode(data, errors, i, result)
if _int_check(encoded):
unicode, pos = _call_encode_errorhandler(
errors, data, "ordinal not in range(128)", "ascii", encoded, i
)
if _bytes_check(unicode):
result += unicode
i = pos
continue
for char in unicode:
if char > "\x7F":
raise UnicodeEncodeError(
"ascii", data, encoded, i, "ordinal not in range(128)"
)
_bytearray_string_append(result, unicode)
i = pos
if _bytes_check(encoded):
return encoded, i
# _ascii_encode encountered an error and _call_encode_errorhandler was the
# last function to write to `result`.
return bytes(result), i
def charmap_decode(data, errors="strict", mapping=None):
_byteslike_guard(data)
_str_guard(errors)
if errors != "strict":
_unimplemented()
result = _str_array()
data_len = _bytes_len(data)
i = 0
while i < data_len:
try:
mapped = mapping[data[i]]
if mapped is None or mapped == "\ufffe":
raise UnicodeDecodeError(
"charmap", data, data[i], i, "character maps to <undefined>"
)
if _int_check(mapped):
if mapped < 0 or mapped > _maxunicode:
raise TypeError(
f"character mapping must be in range ({_maxunicode + 1:#x})"
)
mapped = chr(mapped)
elif not _str_check(mapped):
raise TypeError("character mapping must return integer, None or str")
_str_array_iadd(result, mapped)
except (IndexError, KeyError):
raise UnicodeDecodeError(
"charmap", data, data[i], i, "character maps to <undefined>"
)
i += 1
return str(result), data_len
def _escape_decode(data: bytes, errors: str, recode_encoding: str):
"""Tries to decode `data`.
If it runs into any errors, it raises and returns the message to throw.
If it finishes encoding, it returns a tuple of
(decoded, length, first_invalid_escape)
where the first_invalid_escape is either the index into the data of the first
invalid escape sequence, or -1 if none occur.
Will eventually have to handle the recode_encoding argument.
"""
_builtin()
def _escape_decode_stateful(
data: bytes, errors: str = "strict", recode_encoding: str = ""
):
if not _str_check(data):
_byteslike_guard(data)
if not _str_check(errors):
raise TypeError(
"escape_decode() argument 2 must be str or None, not "
f"{type(errors).__name__}"
)
decoded = _escape_decode(data, errors, recode_encoding)
if _str_check(decoded):
raise ValueError(decoded)
return decoded
def escape_decode(data, errors: str = "strict"):
escaped, length, _ = _escape_decode_stateful(data, errors)
return escaped, length
def _latin_1_decode(data: bytes):
_builtin()
def latin_1_decode(data: bytes, errors: str = "strict"):
_byteslike_guard(data)
if not _str_check(errors):
raise TypeError(
"latin_1_decode() argument 2 must be str or None, not "
f"'{_type(errors).__name__}'"
)
return _latin_1_decode(data)
def _latin_1_encode(data: str, errors: str, index: int, out: bytearray):
"""Tries to encode `data`, starting from `index`, into the `out` bytearray.
If it encounters any codepoints above 255, it tries using the `errors`
error handler to fix it internally, but returns the a tuple of the first
and last index of the error on failure.
If it finishes encoding, it returns a tuple of the final bytes and length.
"""
_builtin()
def latin_1_encode(data: str, errors: str = "strict"):
if not _str_check(data):
raise TypeError(
f"latin_1_encode() argument 1 must be str, not {_type(data).__name__}"
)
if not _str_check(errors):
raise TypeError(
"latin_1_encode() argument 2 must be str or None, not "
f"{_type(errors).__name__}"
)
result = bytearray()
i = 0
encoded = b""
length = _str_len(data)
while i < length:
encoded, i = _latin_1_encode(data, errors, i, result)
if _int_check(encoded):
unicode, pos = _call_encode_errorhandler(
errors, data, "ordinal not in range(256)", "latin-1", encoded, i
)
if _bytes_check(unicode):
result += unicode
i = pos
continue
for char in unicode:
if char > "\xFF":
raise UnicodeEncodeError(
"latin-1", data, encoded, i, "ordinal not in range(256)"
)
result += latin_1_encode(unicode, errors)[0]
i = pos
if _bytes_check(encoded):
return encoded, i
# _latin_1_encode encountered an error and _call_encode_errorhandler was the
# last function to write to `result`.
return bytes(result), i
def _raw_unicode_escape_decode(data: bytes, errors: str, index: int, out: _str_array):
"""Tries to decode `data`, starting from `index`, into the `out` _str_array.
Only decodes raw unicode uXXXX or UXXXXXXXX.
If it runs into any errors, it returns a tuple of
(error_start, error_end, error_message),
If it finishes decoding, it returns a tuple of
(decoded, length)
"""
_builtin()
def raw_unicode_escape_decode(data, errors: str = "strict"):
if not _str_check(data):
_byteslike_guard(data)
if not _str_check(errors):
raise TypeError(
"raw_unicode_escape_decode() argument 2 must be str, not "
f"{type(errors).__name__}"
)
result = _str_array()
i = 0
decoded = ""
length = len(data)
while i < length:
decoded, i, error_msg = _raw_unicode_escape_decode(data, errors, i, result)
if error_msg:
data, i = _call_decode_errorhandler(
errors, data, result, error_msg, "rawunicodeescape", decoded, i
)
if _str_check(decoded):
return decoded, i
# The error handler was the last to write to the result
return str(result), i
def _raw_unicode_escape_encode(data):
_builtin()
def raw_unicode_escape_encode(data, errors: str = "strict"):
if not _str_check(data):
raise TypeError(
f"raw_unicode_escape_encode() argument 1 must be str, not {_type(data).__name__}"
)
if not _str_check(errors):
raise TypeError(
"raw_unicode_escape_encode() argument 2 must be str, not "
f"{type(errors).__name__}"
)
return _raw_unicode_escape_encode(data)
def _unicode_escape_decode(data: bytes, errors: str, index: int, out: _str_array):
"""Tries to decode `data`, starting from `index`, into the `out` _str_array.
If it runs into any errors, it returns a tuple of
(error_start, error_end, error_message, first_invalid_escape),
where the first_invalid_escape is either the index into the data of the first
invalid escape sequence, or -1 if none occur.
If it finishes encoding, it returns a tuple of
(decoded, length, "", first_invalid_escape)
"""
_builtin()
def _unicode_escape_decode_stateful(data: bytes, errors: str = "strict"):
if not _str_check(data):
_byteslike_guard(data)
if not _str_check(errors):
raise TypeError(
"unicode_escape_decode() argument 2 must be str or None, not "
f"{type(errors).__name__}"
)
result = _str_array()
i = 0
decoded = ""
length = len(data)
while i < length:
decoded, i, error_msg, first_invalid = _unicode_escape_decode(
data, errors, i, result
)
if error_msg:
data, i = _call_decode_errorhandler(
errors, data, result, error_msg, "unicodeescape", decoded, i
)
if _str_check(decoded):
return decoded, i, first_invalid
# The error handler was the last to write to the result
return str(result), i, first_invalid
def unicode_escape_decode(data, errors: str = "strict"):
escaped, length, _ = _unicode_escape_decode_stateful(data, errors)
return escaped, length
def unicode_escape_encode(data, errors: str = "strict"):
_unimplemented()
def _utf_8_decode(
data: bytes, errors: str, index: int, out: _str_array, is_final: bool
):
"""Tries to decode `data`, starting from `index`, into the `out` _str_array.
If it runs into any errors, it returns a tuple of
(error_start, error_end, error_message),
If it finishes encoding, it returns a tuple of
(decoded, length, "")
"""
_builtin()
def utf_8_decode(data: bytes, errors: str = "strict", is_final: bool = False):
_byteslike_guard(data)
if not _str_check(errors) and not None:
raise TypeError(
"utf_8_decode() argument 2 must be str or None, not "
f"'{_type(errors).__name__}'"
)
result = _bytes_decode_utf_8(data)
if result is not _Unbound:
return result, _bytes_len(data)
result = _str_array()
i = 0
encoded = ""
length = len(data)
while i < length:
encoded, i, errmsg = _utf_8_decode(data, errors, i, result, is_final)
if _int_check(encoded):
data, i = _call_decode_errorhandler(
errors, data, result, errmsg, "utf-8", encoded, i
)
continue
# If encoded isn't an int, utf_8_decode returned because it ran into
# an error it could potentially recover from and is_final is true.
# We should stop decoding in this case.
break
if _str_check(encoded):
return encoded, i
# The error handler was the last to write to the result
return str(result), i
def _utf_8_encode(data: str, errors: str, index: int, out: bytearray):
"""Tries to encode `data`, starting from `index`, into the `out` bytearray.
If it encounters an error, it tries using the `errors` error handler to
fix it internally, but returns the a tuple of the first and last index of
the error.
If it finishes encoding, it returns a tuple of the final bytes and length.
"""
_builtin()
def utf_8_encode(data: str, errors: str = "strict"):
if not _str_check(data):
raise TypeError(
f"utf_8_encode() argument 1 must be str, not {_type(data).__name__}"
)
if not _str_check(errors):
raise TypeError(
"utf_8_encode() argument 2 must be str or None, not "
f"{_type(errors).__name__}"
)
result = bytearray()
i = 0
encoded = bytes()
length = _str_len(data)
while i < length:
encoded, i = _utf_8_encode(data, errors, i, result)
if _int_check(encoded):
unicode, pos = _call_encode_errorhandler(
errors, data, "surrogates not allowed", "utf-8", encoded, i
)
if _bytes_check(unicode):
result += unicode
i = pos
continue
for char in unicode:
if char > "\x7F":
raise UnicodeEncodeError(
"utf-8", data, encoded, i, "surrogates not allowed"
)
_bytearray_string_append(result, unicode)
i = pos
if _bytes_check(encoded):
return encoded, i
# _utf_8_encode encountered an error and _call_encode_errorhandler was the
# last function to write to `result`.
return bytes(result), i
def _utf_16_encode(data: str, errors: str, index: int, out: bytearray, byteorder: int):
_builtin()
def utf_16_encode(data: str, errors: str = "strict", byteorder: int = 0): # noqa: C901
if byteorder < 0:
h_encoding = "utf-16-le"
u_encoding = "utf_16_le"
elif byteorder < 0:
h_encoding = "utf-16-be"
u_encoding = "utf_16_be"
else:
h_encoding = "utf-16"
u_encoding = "utf_16"
if not _str_check(data):
raise TypeError(
f"{u_encoding}_encode() argument 1 must be str, not {_type(data).__name__}"
)
if not _str_check(errors):
raise TypeError(
f"{u_encoding}_encode() argument 2 must be str or None, not "
f"{_type(errors).__name__}"
)
result = bytearray()
if byteorder == 0:
result += b"\xFF"
result += b"\xFE"
i = 0
length = _str_len(data)
encoded = bytes(result)
while i < length:
encoded, i = _utf_16_encode(data, errors, i, result, byteorder)
if _int_check(encoded):
unicode, pos = _call_encode_errorhandler(
errors, data, "surrogates not allowed", h_encoding, encoded, i
)
if _bytes_check(unicode):
if _bytes_len(unicode) & 1:
raise UnicodeEncodeError(
h_encoding, data, encoded, i, "surrogates not allowed"
)
result += unicode
i = pos
continue
for char in unicode:
if char > "\x7F":
raise UnicodeEncodeError(
h_encoding, data, encoded, i, "surrogates not allowed"
)
result += utf_16_encode(
unicode, errors, -1 if byteorder == 0 else byteorder
)[0]
i = pos
if _bytes_check(encoded):
return encoded, i
# _utf_16_encode encountered an error and _call_encode_errorhandler was the
# last function to write to `result`.
return bytes(result), i
def utf_16_le_decode(data: str, errors: str = "strict"):
_unimplemented()
def utf_16_le_encode(data: str, errors: str = "strict"):
return utf_16_encode(data, errors, -1)
def utf_16_be_decode(data: str, errors: str = "strict"):
_unimplemented()
def utf_16_be_encode(data: str, errors: str = "strict"):
return utf_16_encode(data, errors, 1)
def _utf_32_encode(data: str, errors: str, index: int, out: bytearray, byteorder: int):
_builtin()
def utf_32_encode(data: str, errors: str = "strict", byteorder: int = 0): # noqa: C901
if byteorder < 0:
hEncoding = "utf-32-le"
uEncoding = "utf_32_le"
elif byteorder < 0:
hEncoding = "utf-32-be"
uEncoding = "utf_32_be"
else:
hEncoding = "utf-32"
uEncoding = "utf_32"
if not _str_check(data):
raise TypeError(
f"{uEncoding}_encode() argument 1 must be str, not {_type(data).__name__}"
)
if not _str_check(errors):
raise TypeError(
f"{uEncoding}_encode() argument 2 must be str or None, not "
f"{_type(errors).__name__}"
)
result = bytearray()
if byteorder == 0:
result += b"\xFF\xFE\x00\x00"
i = 0
length = _str_len(data)
encoded = bytes(result)
while i < length:
encoded, i = _utf_32_encode(data, errors, i, result, byteorder)
if _int_check(encoded):
unicode, pos = _call_encode_errorhandler(
errors, data, "surrogates not allowed", hEncoding, encoded, i
)
if _bytes_check(unicode):
if _bytes_len(unicode) & 3:
raise UnicodeEncodeError(
hEncoding, data, encoded, i, "surrogates not allowed"
)
result += unicode
i = pos
continue
for char in unicode:
if char > "\x7f":
raise UnicodeEncodeError(
hEncoding, data, encoded, i, "surrogates not allowed"
)
result += utf_32_encode(
unicode, errors, -1 if byteorder == 0 else byteorder
)[0]
i = pos
if _bytes_check(encoded):
return encoded, i
# _utf_32_encode encountered an error and _call_encode_errorhandler was the
# last function to write to `result`.
return bytes(result), i
def utf_32_le_encode(data: str, errors: str = "strict"):
return utf_32_encode(data, errors, -1)
def utf_32_be_encode(data: str, errors: str = "strict"):
return utf_32_encode(data, errors, 1)
_codec_decode_table = {
"ascii": ascii_decode,
"us_ascii": ascii_decode,
"latin1": latin_1_decode,
"latin 1": latin_1_decode,
"latin-1": latin_1_decode,
"latin_1": latin_1_decode,
"utf_8": utf_8_decode,
"utf-8": utf_8_decode,
"utf8": utf_8_decode,
}
_codec_encode_table = {
"ascii": ascii_encode,
"us_ascii": ascii_encode,
"latin_1": latin_1_encode,
"latin-1": latin_1_encode,
"iso-8859-1": latin_1_encode,
"iso_8859_1": latin_1_encode,
"utf_8": utf_8_encode,
"utf-8": utf_8_encode,
"utf8": utf_8_encode,
"utf_16": utf_16_encode,
"utf-16": utf_16_encode,
"utf16": utf_16_encode,
"utf_16_le": utf_16_le_encode,
"utf-16-le": utf_16_le_encode,
"utf_16_be": utf_16_be_encode,
"utf-16-be": utf_16_be_encode,
"utf_32": utf_32_encode,
"utf-32": utf_32_encode,
"utf32": utf_32_encode,
"utf_32_le": utf_32_le_encode,
"utf-32-le": utf_32_le_encode,
"utf_32_be": utf_32_be_encode,
"utf-32-be": utf_32_be_encode,
}
def backslashreplace_errors(error):
_builtin()
def strict_errors(error):
if not isinstance(error, Exception):
raise TypeError("codec must pass exception instance")
raise error
def ignore_errors(error):
if not isinstance(error, UnicodeError):
raise TypeError(
f"don't know how to handle {_type(error).__name__} in error callback"
)
return ("", error.end)
def lookup_error(error: str):
if not _str_check(error):
raise TypeError(
f"lookup_error() argument must be str, not {_type(error).__name__}"
)
try:
return _codec_error_registry[error]
except KeyError:
raise LookupError(f"unknown error handler name '{error}'")
def register_error(name: str, error_func):
if not _str_check(name):
raise TypeError(
f"register_error() argument 1 must be str, not {_type(name).__name__}"
)
if not callable(error_func):
raise TypeError("handler must be callable")
_codec_error_registry[name] = error_func
def _call_decode_errorhandler(
errors: str,
input: bytes,
output: _str_array,
reason: str,
encoding: str,
start: int,
end: int,
):
"""
Generic decoding errorhandling function
Creates a UnicodeDecodeError, looks up an error handler, and calls the
error handler with the UnicodeDecodeError.
Makes sure the error handler returns a (str, int) tuple and returns it and
writes the str to the output _str_array passed in.
Since the error handler can change the object that's being decoded by
replacing the object of the UnicodeDecodeError, this function returns the
Error's object field, along with the integer returned from the function
call that's been normalized to fit within the length of the object.
errors: The name of the error handling function to call
input: The input to be decoded
output: The string builder that the error handling result should be appended to
reason: The reason the errorhandler was called
encoding: The encoding being used
start: The index of the first non-erroneus byte
end: The index of the first non-erroneous byte
"""
exception = UnicodeDecodeError(encoding, input, start, end, reason)
result = lookup_error(errors)(exception)
if not _tuple_check(result) or _tuple_len(result) != 2:
raise TypeError("decoding error handler must return (str, int) tuple")
replacement, pos = result
if not _str_check(replacement) or not _object_type_hasattr(pos, "__index__"):
raise TypeError("decoding error handler must return (str, int) tuple")
pos = _index(pos)
input = exception.object
if not _bytes_check(input):
raise TypeError("exception attribute object must be bytes")
if pos < 0:
pos += _bytes_len(input)
if not 0 <= pos <= _bytes_len(input):
raise IndexError(f"position {pos} from error handler out of bounds")
_str_array_iadd(output, replacement)
return (input, pos)
def _call_encode_errorhandler(
errors: str, input: str, reason: str, encoding: str, start: int, end: int
):
"""
Generic encoding errorhandling function
Creates a UnicodeEncodeError, looks up an error handler, and calls the
error handler with the UnicodeEncodeError.
Makes sure the error handler returns a (str/bytes, int) tuple and returns it
errors: The name of the error handling function to call
input: The input to be encoded
reason: The reason the errorhandler was called
encoding: The encoding being used
start: The index of the first non-erroneus byte
end: The index of the first non-erroneous byte
"""
exception = UnicodeEncodeError(encoding, input, start, end, reason)
result = lookup_error(errors)(exception)
if not _tuple_check(result) or _tuple_len(result) != 2:
raise TypeError("encoding error handler must return (str/bytes, int) tuple")
unicode, pos = result
if (
not _str_check(unicode)
and not _bytes_check(unicode)
or not _object_type_hasattr(pos, "__index__")
):
raise TypeError("encoding error handler must return (str/bytes, int) tuple")
pos = _index(pos)
length = len(input)
if pos < 0:
pos += length
if not 0 <= pos <= length:
raise IndexError(f"position {pos} from error handler out of bounds")
return unicode, pos
# TODO(T61927696): Support surrogatepass errors for utf-8 decode
_codec_error_registry = {
"backslashreplace": backslashreplace_errors,
"strict": strict_errors,
"ignore": ignore_errors,
}
def _bytearray_string_append(dst: bytearray, data: str):
_builtin()
# NOTE: This should behave the same as codecs.IncrementalEncoder.
# TODO(T61720167): Should be removed once we can freeze encodings
class IncrementalEncoder(object):
def __init__(self, errors="strict"):
self.errors = errors
self.buffer = ""
def encode(self, input, final=False):
raise NotImplementedError
def reset(self):
pass
def getstate(self):
return 0
def setstate(self, state):
pass
# NOTE: This should behave the same as codecs.IncrementalDecoder.
# TODO(T61720167): Should be removed once we can freeze encodings
class IncrementalDecoder(object):
def __init__(self, errors="strict"):
self.errors = errors
def decode(self, input, final=False):
raise NotImplementedError
def reset(self):
pass
def getstate(self):
return (b"", 0)
def setstate(self, state):
pass
# NOTE: This should behave the same as codecs.BufferedIncrementalDecoder.
# TODO(T61720167): Should be removed once we can freeze encodings
class BufferedIncrementalDecoder(IncrementalDecoder):
def __init__(self, errors="strict"):
IncrementalDecoder.__init__(self, errors)
self.buffer = b""
def _buffer_decode(self, input, errors, final):
raise NotImplementedError
def decode(self, input, final=False):
data = self.buffer + input
(result, consumed) = self._buffer_decode(data, self.errors, final)
self.buffer = data[consumed:]
return result
def reset(self):
IncrementalDecoder.reset(self)
self.buffer = b""
def getstate(self):
return (self.buffer, 0)
def setstate(self, state):
self.buffer = state[0]
# TODO(T61720167): Should be removed once we can freeze encodings
class UTF8IncrementalEncoder(IncrementalEncoder):
def encode(self, input, final=False):
return utf_8_encode(input, self.errors)[0]
# TODO(T61720167): Should be removed once we can freeze encodings
class UTF8IncrementalDecoder(BufferedIncrementalDecoder):
@staticmethod
def _buffer_decode(input, errors, final):
return utf_8_decode(input, errors, final)
# TODO(T61720167): Should be removed once we can freeze encodings
def getincrementaldecoder(encoding):
if encoding == "UTF-8" or encoding == "utf-8":
return UTF8IncrementalDecoder
decoder = lookup(encoding).incrementaldecoder
if decoder is None:
raise LookupError(encoding)
return decoder
# TODO(T61720167): Should be removed once we can freeze encodings
def getincrementalencoder(encoding):
if encoding == "UTF-8" or encoding == "utf-8":
return UTF8IncrementalEncoder
encoder = lookup(encoding).incrementalencoder
if encoder is None:
raise LookupError(encoding)
return encoder