in runtime/under-io-module.cpp [883:995]
RawObject FUNC(_io, _TextIOWrapper_write_UTF8)(Thread* thread, Arguments args) {
HandleScope scope(thread);
Object text_obj(&scope, args.get(1));
Runtime* runtime = thread->runtime();
if (!runtime->isInstanceOfStr(*text_obj)) {
return thread->raiseWithFmt(LayoutId::kTypeError,
"write() argument must be str, not %T",
&text_obj);
}
Object self_obj(&scope, args.get(0));
if (!runtime->isInstanceOfTextIOWrapper(*self_obj)) {
return thread->raiseRequiresType(self_obj, ID(TextIOWrapper));
}
TextIOWrapper text_io(&scope, *self_obj);
if (text_io.detached()) {
return thread->raiseWithFmt(LayoutId::kValueError,
"underlying buffer has been detached");
}
Object buffer_obj(&scope, text_io.buffer());
if (!buffer_obj.isBufferedWriter()) {
return Unbound::object();
}
BufferedWriter buffer(&scope, text_io.buffer());
if (buffer.closed()) {
return thread->raiseWithFmt(LayoutId::kTypeError,
"I/O operation on closed file.");
}
if (text_io.encoding() != SmallStr::fromCStr("utf-8") &&
text_io.encoding() != SmallStr::fromCStr("UTF-8")) {
return Unbound::object();
}
Str writenl(&scope, text_io.writenl());
// Only allow writenl to be cr or lf in this short cut
if (!text_io.writetranslate() || writenl == SmallStr::fromCStr("\r\n")) {
return Unbound::object();
}
Str text(&scope, strUnderlying(*text_obj));
word text_len = text.length();
Bytearray write_buffer(&scope, buffer.writeBuf());
word old_len = write_buffer.numItems();
word new_len = old_len + text_len;
runtime->bytearrayEnsureCapacity(thread, write_buffer, new_len);
MutableBytes write_buffer_bytes(&scope, write_buffer.items());
write_buffer_bytes.replaceFromWithStr(old_len, *text, text_len);
write_buffer.setNumItems(new_len);
int32_t codepoint;
word num_bytes;
bool hasnl = false;
if (writenl == SmallStr::fromCStr("\n")) {
for (word offset = 0; offset < text_len;) {
codepoint = text.codePointAt(offset, &num_bytes);
if (Unicode::isSurrogate(codepoint)) {
write_buffer.downsize(old_len);
return Unbound::object();
}
if (num_bytes == 1) {
if (text.byteAt(offset) == '\n' || text.byteAt(offset) == '\r') {
hasnl = true;
}
}
offset += num_bytes;
}
} else {
for (word offset = 0; offset < text_len;) {
codepoint = text.codePointAt(offset, &num_bytes);
if (Unicode::isSurrogate(codepoint)) {
write_buffer.downsize(old_len);
return Unbound::object();
}
if (num_bytes == 1) {
if (text.byteAt(offset) == '\n') {
hasnl = true;
write_buffer_bytes.byteAtPut(offset + old_len, byte{'\r'});
offset += num_bytes;
continue;
}
if (text.byteAt(offset) == '\r') {
hasnl = true;
offset += num_bytes;
continue;
}
}
offset += num_bytes;
}
}
if (text_io.lineBuffering() && hasnl) {
// TODO(T61927696): Implement native support for
// BufferedWriter._flush_unlocked to do flush here
Object flush_result(&scope, thread->invokeMethod1(buffer, ID(flush)));
if (flush_result.isErrorException()) return *flush_result;
text_io.setTelling(text_io.seekable());
}
text_io.setDecodedChars(Str::empty());
text_io.setSnapshot(NoneType::object());
Object decoder_obj(&scope, text_io.decoder());
if (!decoder_obj.isNoneType()) {
Object reset_result(&scope, thread->invokeMethod1(decoder_obj, ID(reset)));
if (reset_result.isErrorException()) return *reset_result;
}
return SmallInt::fromWord(text_len);
}