in nailgun-client/py/ng.py [0:0]
def recv_into(self, buffer, nbytes):
# we don't use memoryview because OVERLAPPED I/O happens
# after the method (ReadFile) returns
buf = ctypes.create_string_buffer(nbytes)
olap = OVERLAPPED()
olap.hEvent = self.read_waitable
immediate = ReadFile(self.pipe, buf, nbytes, None, olap)
err = GetLastError()
if err == ERROR_NO_PROCESS_ON_OTHER_END_OF_PIPE:
raise NailgunException(
"No process on the other end of pipe",
NailgunException.CONNECTION_BROKEN,
)
if not immediate:
if err != ERROR_IO_PENDING:
self._raise_win_err("failed to read %d bytes" % nbytes, GetLastError())
nread = wintypes.DWORD()
if not GetOverlappedResult(self.pipe, olap, nread, True):
err = GetLastError()
self._raise_win_err("error while waiting for read", err)
nread = nread.value
if not is_py2:
# Wrap in a memoryview, as python3 does not let you assign from a
# ctypes.c_char_array slice directly to a memory view, as one is 'c', and one
# is '<c' struct/buffer proto format.
buf = compat_memoryview(buf)
buffer[:nread] = buf[:nread]
return nread