@@ -101,16 +101,49 @@
return LineBufferedWrapper(stream)
+class WriteAllWrapper(object):
+ def __init__(self, orig):
+ self.orig = orig
+
+ def __getattr__(self, attr):
+ return getattr(self.orig, attr)
+
+ def write(self, s):
+ write1 = self.orig.write
+ m = memoryview(s)
+ total_to_write = len(s)
+ total_written = 0
+ while total_written < total_to_write:
+ total_written += write1(m[total_written:])
+ return total_written
+
+
+io.IOBase.register(WriteAllWrapper)
+
+
+def make_write_all(stream):
+ assert pycompat.ispy3
+ if isinstance(stream, WriteAllWrapper):
+ return stream
+ if isinstance(stream, io.BufferedIOBase):
+ # The io.BufferedIOBase.write() contract guarantees that all data is
+ # written.
+ return stream
+ # In general, the write() method of streams is free to write only part of
+ # the data.
+ return WriteAllWrapper(stream)
+
+
if pycompat.ispy3:
# Python 3 implements its own I/O streams.
# TODO: .buffer might not exist if std streams were replaced; we'll need
# a silly wrapper to make a bytes stream backed by a unicode one.
stdin = sys.stdin.buffer
- stdout = sys.stdout.buffer
+ stdout = make_write_all(sys.stdout.buffer)
if isatty(stdout):
# The standard library doesn't offer line-buffered binary streams.
stdout = make_line_buffered(stdout)
- stderr = sys.stderr.buffer
+ stderr = make_write_all(sys.stderr.buffer)
else:
# Python 2 uses the I/O streams provided by the C library.
stdin = sys.stdin
@@ -7,6 +7,7 @@
import contextlib
import errno
import os
+import signal
import subprocess
import sys
import unittest
@@ -41,6 +42,19 @@
'''
+TEST_LARGE_WRITE_CHILD_SCRIPT = r'''
+import signal
+import sys
+
+from mercurial import dispatch
+from mercurial.utils import procutil
+
+signal.signal(signal.SIGINT, lambda *x: None)
+dispatch.initstdio()
+procutil.{stream}.write(b'x' * 1048576)
+'''
+
+
@contextlib.contextmanager
def _closing(fds):
try:
@@ -73,8 +87,8 @@
yield rwpair
-def _readall(fd, buffer_size):
- buf = []
+def _readall(fd, buffer_size, initial_buf=None):
+ buf = initial_buf or []
while True:
try:
s = os.read(fd, buffer_size)
@@ -111,7 +125,7 @@
)
try:
os.close(child_stream)
- check_output(stream_receiver)
+ check_output(stream_receiver, proc)
except: # re-raises
proc.terminate()
raise
@@ -122,7 +136,7 @@
def _test_buffering(
self, stream, rwpair_generator, expected_output, python_args=[]
):
- def check_output(stream_receiver):
+ def check_output(stream_receiver, proc):
self.assertEqual(_readall(stream_receiver, 1024), expected_output)
self._test(
@@ -154,7 +168,7 @@
)
def _test_flush_stdio(self, stream, rwpair_generator):
- def check_output(stream_receiver):
+ def check_output(stream_receiver, proc):
self.assertEqual(_readall(stream_receiver, 1024), b'test')
self._test(
@@ -176,6 +190,61 @@
def test_flush_stderr_ptys(self):
self._test_flush_stdio('stderr', _ptys)
+ def _test_large_write(self, stream, rwpair_generator, python_args=[]):
+ if not pycompat.ispy3 and pycompat.isdarwin:
+ # Python 2 doesn't always retry on EINTR, but the libc might retry.
+ # So far, it was observed only on macOS that EINTR is raised at the
+ # Python level. As Python 2 support will be dropped soon-ish, we
+ # won't attempt to fix it.
+ raise unittest.SkipTest("raises EINTR on macOS")
+
+ def check_output(stream_receiver, proc):
+ if not pycompat.iswindows:
+ # On Unix, we can provoke a partial write() by interrupting it
+ # by a signal handler as soon as a bit of data was written.
+ # We test that write() is called until all data is written.
+ buf = [os.read(stream_receiver, 1)]
+ proc.send_signal(signal.SIGINT)
+ else:
+ # On Windows, there doesn't seem to be a way to cause partial
+ # writes.
+ buf = []
+ self.assertEqual(
+ _readall(stream_receiver, 131072, buf), b'x' * 1048576
+ )
+
+ self._test(
+ TEST_LARGE_WRITE_CHILD_SCRIPT.format(stream=stream),
+ stream,
+ rwpair_generator,
+ check_output,
+ python_args,
+ )
+
+ def test_large_write_stdout_pipes(self):
+ self._test_large_write('stdout', _pipes)
+
+ def test_large_write_stdout_ptys(self):
+ self._test_large_write('stdout', _ptys)
+
+ def test_large_write_stdout_pipes_unbuffered(self):
+ self._test_large_write('stdout', _pipes, python_args=['-u'])
+
+ def test_large_write_stdout_ptys_unbuffered(self):
+ self._test_large_write('stdout', _ptys, python_args=['-u'])
+
+ def test_large_write_stderr_pipes(self):
+ self._test_large_write('stderr', _pipes)
+
+ def test_large_write_stderr_ptys(self):
+ self._test_large_write('stderr', _ptys)
+
+ def test_large_write_stderr_pipes_unbuffered(self):
+ self._test_large_write('stderr', _pipes, python_args=['-u'])
+
+ def test_large_write_stderr_ptys_unbuffered(self):
+ self._test_large_write('stderr', _ptys, python_args=['-u'])
+
if __name__ == '__main__':
import silenttestrunner