From patchwork Sun Jul 12 22:41:34 2020 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: [09, of, 11, V2] procutil: ensure that procutil.std{out, err}.write() writes all bytes From: Manuel Jacob X-Patchwork-Id: 46713 Message-Id: <4adabe764af5c4104da5.1594593694@tmp> To: mercurial-devel@mercurial-scm.org Date: Mon, 13 Jul 2020 00:41:34 +0200 # HG changeset patch # User Manuel Jacob # Date 1594376878 -7200 # Fri Jul 10 12:27:58 2020 +0200 # Node ID 4adabe764af5c4104da59140a04c1102592e60c1 # Parent c904a1769e30652813eb1df619e8ee7d8feb6102 # EXP-Topic stdio procutil: ensure that procutil.std{out,err}.write() writes all bytes Python 3 offers different kind of streams and it’s not guaranteed for all of them that calling write() writes all bytes. When Python is started in unbuffered mode, sys.std{out,err}.buffer are instances of io.FileIO, whose write() can write less bytes for platform-specific reasons (e.g. Linux has a 0x7ffff000 bytes maximum and could write less if interrupted by a signal; when writing to Windows consoles, it’s limited to 32767 bytes to avoid the "not enough space" error). This can lead to silent loss of data, both when using sys.std{out,err}.buffer (which may in fact not be a buffered stream) and when using the text streams sys.std{out,err} (I’ve created a CPython bug report for that: https://bugs.python.org/issue41221). Python may fix the problem at some point. For now, we implement our own wrapper for procutil.std{out,err} that calls the raw stream’s write() method until all bytes have been written. We don’t use sys.std{out,err} for larger writes, so I think it’s not worth the effort to patch them. diff --git a/mercurial/utils/procutil.py b/mercurial/utils/procutil.py --- a/mercurial/utils/procutil.py +++ b/mercurial/utils/procutil.py @@ -101,16 +101,49 @@ return LineBufferedWrapper(stream) +class WriteAllWrapper(object): + def __init__(self, orig): + self.orig = orig + + def __getattr__(self, attr): + return getattr(self.orig, attr) + + def write(self, s): + write1 = self.orig.write + m = memoryview(s) + total_to_write = len(s) + total_written = 0 + while total_written < total_to_write: + total_written += write1(m[total_written:]) + return total_written + + +io.IOBase.register(WriteAllWrapper) + + +def make_write_all(stream): + assert pycompat.ispy3 + if isinstance(stream, WriteAllWrapper): + return stream + if isinstance(stream, io.BufferedIOBase): + # The io.BufferedIOBase.write() contract guarantees that all data is + # written. + return stream + # In general, the write() method of streams is free to write only part of + # the data. + return WriteAllWrapper(stream) + + if pycompat.ispy3: # Python 3 implements its own I/O streams. # TODO: .buffer might not exist if std streams were replaced; we'll need # a silly wrapper to make a bytes stream backed by a unicode one. stdin = sys.stdin.buffer - stdout = sys.stdout.buffer + stdout = make_write_all(sys.stdout.buffer) if isatty(stdout): # The standard library doesn't offer line-buffered binary streams. stdout = make_line_buffered(stdout) - stderr = sys.stderr.buffer + stderr = make_write_all(sys.stderr.buffer) else: # Python 2 uses the I/O streams provided by the C library. stdin = sys.stdin diff --git a/tests/test-stdio.py b/tests/test-stdio.py --- a/tests/test-stdio.py +++ b/tests/test-stdio.py @@ -7,6 +7,7 @@ import contextlib import errno import os +import signal import subprocess import sys import unittest @@ -41,6 +42,19 @@ ''' +TEST_LARGE_WRITE_CHILD_SCRIPT = r''' +import signal +import sys + +from mercurial import dispatch +from mercurial.utils import procutil + +signal.signal(signal.SIGINT, lambda *x: None) +dispatch.initstdio() +procutil.{stream}.write(b'x' * 1048576) +''' + + @contextlib.contextmanager def _closing(fds): try: @@ -73,8 +87,8 @@ yield rwpair -def _readall(fd, buffer_size): - buf = [] +def _readall(fd, buffer_size, initial_buf=None): + buf = initial_buf or [] while True: try: s = os.read(fd, buffer_size) @@ -111,7 +125,7 @@ ) try: os.close(child_stream) - check_output(stream_receiver) + check_output(stream_receiver, proc) except: # re-raises proc.terminate() raise @@ -122,7 +136,7 @@ def _test_buffering( self, stream, rwpair_generator, expected_output, python_args=[] ): - def check_output(stream_receiver): + def check_output(stream_receiver, proc): self.assertEqual(_readall(stream_receiver, 1024), expected_output) self._test( @@ -154,7 +168,7 @@ ) def _test_flush_stdio(self, stream, rwpair_generator): - def check_output(stream_receiver): + def check_output(stream_receiver, proc): self.assertEqual(_readall(stream_receiver, 1024), b'test') self._test( @@ -176,6 +190,61 @@ def test_flush_stderr_ptys(self): self._test_flush_stdio('stderr', _ptys) + def _test_large_write(self, stream, rwpair_generator, python_args=[]): + if not pycompat.ispy3 and pycompat.isdarwin: + # Python 2 doesn't always retry on EINTR, but the libc might retry. + # So far, it was observed only on macOS that EINTR is raised at the + # Python level. As Python 2 support will be dropped soon-ish, we + # won't attempt to fix it. + raise unittest.SkipTest("raises EINTR on macOS") + + def check_output(stream_receiver, proc): + if not pycompat.iswindows: + # On Unix, we can provoke a partial write() by interrupting it + # by a signal handler as soon as a bit of data was written. + # We test that write() is called until all data is written. + buf = [os.read(stream_receiver, 1)] + proc.send_signal(signal.SIGINT) + else: + # On Windows, there doesn't seem to be a way to cause partial + # writes. + buf = [] + self.assertEqual( + _readall(stream_receiver, 131072, buf), b'x' * 1048576 + ) + + self._test( + TEST_LARGE_WRITE_CHILD_SCRIPT.format(stream=stream), + stream, + rwpair_generator, + check_output, + python_args, + ) + + def test_large_write_stdout_pipes(self): + self._test_large_write('stdout', _pipes) + + def test_large_write_stdout_ptys(self): + self._test_large_write('stdout', _ptys) + + def test_large_write_stdout_pipes_unbuffered(self): + self._test_large_write('stdout', _pipes, python_args=['-u']) + + def test_large_write_stdout_ptys_unbuffered(self): + self._test_large_write('stdout', _ptys, python_args=['-u']) + + def test_large_write_stderr_pipes(self): + self._test_large_write('stderr', _pipes) + + def test_large_write_stderr_ptys(self): + self._test_large_write('stderr', _ptys) + + def test_large_write_stderr_pipes_unbuffered(self): + self._test_large_write('stderr', _pipes, python_args=['-u']) + + def test_large_write_stderr_ptys_unbuffered(self): + self._test_large_write('stderr', _ptys, python_args=['-u']) + if __name__ == '__main__': import silenttestrunner