Patchwork D8076: worker: Manually buffer reads from pickle stream

login
register
mail settings
Submitter phabricator
Date Feb. 5, 2020, 12:17 a.m.
Message ID <differential-rev-PHID-DREV-mrjlmw62p3vbinm7kitf-req@mercurial-scm.org>
Download mbox | patch
Permalink /patch/44928/
State Superseded
Headers show

Comments

phabricator - Feb. 5, 2020, 12:17 a.m.
heftig created this revision.
Herald added subscribers: mercurial-devel, mjpieters.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  My previous fix (D8051 <https://phab.mercurial-scm.org/D8051>, which added Python's built-in buffering to the pickle
  stream) has the problem that the selector will ignore the buffer. When multiple
  pickled objects are read from the pipe into the buffer at once, only one object
  will be loaded.
  
  This can repeat until the buffer is full and delays the processing of completed
  items until the worker exits, at which point the pipe is always considered
  readable and all remaining items are processed.
  
  This changeset reverts D8051 <https://phab.mercurial-scm.org/D8051>, removing the buffer again. Instead, on Python 3
  only, we use a wrapper to modify the "read" provided to the Unpickler to behave
  more like a buffered read. We never read more bytes from the pipe than the
  Unpickler requests, so the selector behaves as expected.
  
  Also add a test case for "pickle data was truncated" issue.
  
  https://phab.mercurial-scm.org/D8051#119193

REPOSITORY
  rHG Mercurial

BRANCH
  stable

REVISION DETAIL
  https://phab.mercurial-scm.org/D8076

AFFECTED FILES
  mercurial/worker.py
  tests/test-worker.t

CHANGE DETAILS




To: heftig, #hg-reviewers
Cc: mjpieters, mercurial-devel
phabricator - Feb. 5, 2020, 12:36 a.m.
heftig added a comment.


  I went for the wrapper overriding `read` after failing to find a way to ask `BufferedReader` whether data remains in the buffer. Even `peek` will trigger a blocking read when the buffer is empty.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8076/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8076

To: heftig, #hg-reviewers, yuja
Cc: yuja, mjpieters, mercurial-devel
Yuya Nishihara - Feb. 6, 2020, 3:06 p.m.
> +if pycompat.ispy3:
> +
> +    class _blockingreader(object):
> +        def __init__(self, wrapped):
> +            self._wrapped = wrapped
> +
> +        def __getattr__(self, attr):
> +            return getattr(self._wrapped, attr)
> +
> +        # issue multiple reads until size is fulfilled
> +        def read(self, size=-1):
> +            if size < 0:
> +                return self._wrapped.readall()
> +
> +            buf = bytearray(size)
> +            view = memoryview(buf)
> +            pos = 0
> +
> +            while pos < size:
> +                ret = self._wrapped.readinto(view[pos:])
> +                if not ret:
> +                    break
> +                pos += ret
> +
> +            del view
> +            del buf[pos:]
> +            return buf

Might be better to optimize the common case `wrapped.read(size) == size`.
FWIW, if we don't mind issuing extra `read()` syscalls, maybe we can abuse
BufferedReader of `buffer_size=1`.

Another option is to rewrite the select loop to fully manage response buffer
by ourselves.

```
for key, events in selector.select():
    ...
    our_buffer.extend(key.fileobj.read())
    temp_io = BytesIO(our_buffer)
    while ...:
        try:
            pickle.load(temp_io)
        except ...
            ...
    del our_buffer[:temp_io.tell()]
```
phabricator - Feb. 6, 2020, 3:11 p.m.
yuja added a comment.


  > +if pycompat.ispy3:
  > +
  > +    class _blockingreader(object):
  > +        def __init__(self, wrapped):
  > +            self._wrapped = wrapped
  > +
  > +        def __getattr__(self, attr):
  > +            return getattr(self._wrapped, attr)
  > +
  > +        # issue multiple reads until size is fulfilled
  > +        def read(self, size=-1):
  > +            if size < 0:
  > +                return self._wrapped.readall()
  > +
  > +            buf = bytearray(size)
  > +            view = memoryview(buf)
  > +            pos = 0
  > +
  > +            while pos < size:
  > +                ret = self._wrapped.readinto(view[pos:])
  > +                if not ret:
  > +                    break
  > +                pos += ret
  > +
  > +            del view
  > +            del buf[pos:]
  > +            return buf
  
  Might be better to optimize the common case `wrapped.read(size) == size`.
  FWIW, if we don't mind issuing extra `read()` syscalls, maybe we can abuse
  BufferedReader of `buffer_size=1`.
  
  Another option is to rewrite the select loop to fully manage response buffer
  by ourselves.
  
    for key, events in selector.select():
        ...
        our_buffer.extend(key.fileobj.read())
        temp_io = BytesIO(our_buffer)
        while ...:
            try:
                pickle.load(temp_io)
            except ...
                ...
        del our_buffer[:temp_io.tell()]

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8076/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8076

To: heftig, #hg-reviewers, yuja
Cc: yuja, mjpieters, mercurial-devel
phabricator - Feb. 7, 2020, 2:50 p.m.
heftig added a comment.


  In D8076#119526 <https://phab.mercurial-scm.org/D8076#119526>, @yuja wrote:
  
  > Might be better to optimize the common case `wrapped.read(size) == size`.
  
  I thought my code was already pretty optimized: We allocate the buffer to read into just once (no matter how many reads we issue) and give it to the unpickler without copying the data.
  
  > FWIW, if we don't mind issuing extra `read()` syscalls, maybe we can abuse
  > BufferedReader of `buffer_size=1`.
  
  My gut says that using just read(1) will be way worse when we try to transmit large objects. That said, I don't know if the Unpickler tries to do such large reads.
  
  > Another option is to rewrite the select loop to fully manage response buffer
  > by ourselves.
  >
  >   for key, events in selector.select():
  >       ...
  >       our_buffer.extend(key.fileobj.read())
  >       temp_io = BytesIO(our_buffer)
  >       while ...:
  >           try:
  >               pickle.load(temp_io)
  >           except ...
  >               ...
  >       del our_buffer[:temp_io.tell()]
  
  That would be an option, too, although `bytearray.extend` + `BytesIO.read` + `bytearray.__delitem__` looks a bit heavy on the memcpys/memmoves. We also have to restart the unpickling whenever the data was truncated. And remember when we're actually EOF.
  
  Wouldn't it look more like this?
  
    for key, events in selector.select():
        try:
            read_data = key.fileobj.read()
        except IOError as e:
            if e.errno == errno.EINTR:
                continue
            raise:
        eof = not read_data
        if eof:
            selector.unregister(key.fileobj)
            key.fileobj.close()
            openpipes -= 1
        buf = buffers[key.fileobj]
        buf.extend(read_data)
        temp_io = BytesIO(buf)
        pos = 0
        while True:
            try:
                res = pickle.load(temp_io)
            except EOFError:
                break
            except UnpicklingError:
                if eof:
                    raise
                break
            pos = temp_io.tell()
            if hasretval and res[0]:
                retval.update(res[1])
            else:
                yield res
            del buf[:pos]
  
  This gets pretty convoluted.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8076/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8076

To: heftig, #hg-reviewers, yuja
Cc: yuja, mjpieters, mercurial-devel
Yuya Nishihara - Feb. 7, 2020, 3:51 p.m.
>   > Might be better to optimize the common case `wrapped.read(size) == size`.
>   
>   I thought my code was already pretty optimized: We allocate the buffer to read into just once (no matter how many reads we issue) and give it to the unpickler without copying the data.

My gut feeling is that running Python code is generally slow compared to calling
a C function, but yeah, that wouldn't matter here.

Queued for stable since the previous attempt was in stable. Thanks.

>   > FWIW, if we don't mind issuing extra `read()` syscalls, maybe we can abuse
>   > BufferedReader of `buffer_size=1`.
>   
>   My gut says that using just read(1) will be way worse when we try to transmit large objects. That said, I don't know if the Unpickler tries to do such large reads.

Unpickler does issue a few small read()s + one read(serialized_obj_size), so
using unbuffered IO (like the original implementation) isn't optimal. Using
a buffered I/O of buffer_size=1 appears to split read(serialized_obj_size)
into read(1) + read(serialized_obj_size - 1) on Python 3.7, but that is
an implementation detail and I don't know how it differs across Python versions.

>   That would be an option, too, although `bytearray.extend` + `BytesIO.read` + `bytearray.__delitem__` looks a bit heavy on the memcpys/memmoves. We also have to restart the unpickling whenever the data was truncated. And remember when we're actually EOF.

Suppose repeated `os.read()`s are way slower than memcpy, I guess this is faster,
but I agree it's a bit more complex.
phabricator - Feb. 7, 2020, 3:52 p.m.
yuja added a comment.


  >   > Might be better to optimize the common case `wrapped.read(size) == size`.
  >   I thought my code was already pretty optimized: We allocate the buffer to read into just once (no matter how many reads we issue) and give it to the unpickler without copying the data.
  
  My gut feeling is that running Python code is generally slow compared to calling
  a C function, but yeah, that wouldn't matter here.
  
  Queued for stable since the previous attempt was in stable. Thanks.
  
  >   > FWIW, if we don't mind issuing extra `read()` syscalls, maybe we can abuse
  >   > BufferedReader of `buffer_size=1`.
  >   My gut says that using just read(1) will be way worse when we try to transmit large objects. That said, I don't know if the Unpickler tries to do such large reads.
  
  Unpickler does issue a few small read()s + one read(serialized_obj_size), so
  using unbuffered IO (like the original implementation) isn't optimal. Using
  a buffered I/O of buffer_size=1 appears to split read(serialized_obj_size)
  into read(1) + read(serialized_obj_size - 1) on Python 3.7, but that is
  an implementation detail and I don't know how it differs across Python versions.
  
  >   That would be an option, too, although `bytearray.extend` + `BytesIO.read` + `bytearray.__delitem__` looks a bit heavy on the memcpys/memmoves. We also have to restart the unpickling whenever the data was truncated. And remember when we're actually EOF.
  
  Suppose repeated `os.read()`s are way slower than memcpy, I guess this is faster,
  but I agree it's a bit more complex.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8076/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8076

To: heftig, #hg-reviewers, yuja
Cc: yuja, mjpieters, mercurial-devel

Patch

diff --git a/tests/test-worker.t b/tests/test-worker.t
--- a/tests/test-worker.t
+++ b/tests/test-worker.t
@@ -131,4 +131,35 @@ 
   abort: known exception
   [255]
 
+Do not crash on partially read result
+
+  $ cat > $TESTTMP/detecttruncated.py <<EOF
+  > from __future__ import absolute_import
+  > import os
+  > import sys
+  > import time
+  > sys.unraisablehook = lambda x: None
+  > oldwrite = os.write
+  > def splitwrite(fd, string):
+  >     ret = oldwrite(fd, string[:9])
+  >     if ret == 9:
+  >         time.sleep(0.1)
+  >         ret += oldwrite(fd, string[9:])
+  >     return ret
+  > os.write = splitwrite
+  > EOF
+
+  $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
+  > "extensions.d=$TESTTMP/detecttruncated.py" test 100000.0
+  start
+  run
+  run
+  run
+  run
+  run
+  run
+  run
+  run
+  done
+
 #endif
diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -65,6 +65,41 @@ 
     return min(max(countcpus(), 4), 32)
 
 
+if pycompat.ispy3:
+
+    class _blockingreader(object):
+        def __init__(self, wrapped):
+            self._wrapped = wrapped
+
+        def __getattr__(self, attr):
+            return getattr(self._wrapped, attr)
+
+        # issue multiple reads until size is fulfilled
+        def read(self, size=-1):
+            if size < 0:
+                return self._wrapped.readall()
+
+            buf = bytearray(size)
+            view = memoryview(buf)
+            pos = 0
+
+            while pos < size:
+                ret = self._wrapped.readinto(view[pos:])
+                if not ret:
+                    break
+                pos += ret
+
+            del view
+            del buf[pos:]
+            return buf
+
+
+else:
+
+    def _blockingreader(wrapped):
+        return wrapped
+
+
 if pycompat.isposix or pycompat.iswindows:
     _STARTUP_COST = 0.01
     # The Windows worker is thread based. If tasks are CPU bound, threads
@@ -226,7 +261,7 @@ 
     selector = selectors.DefaultSelector()
     for rfd, wfd in pipes:
         os.close(wfd)
-        selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
+        selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
 
     def cleanup():
         signal.signal(signal.SIGINT, oldhandler)
@@ -240,7 +275,7 @@ 
         while openpipes > 0:
             for key, events in selector.select():
                 try:
-                    res = util.pickle.load(key.fileobj)
+                    res = util.pickle.load(_blockingreader(key.fileobj))
                     if hasretval and res[0]:
                         retval.update(res[1])
                     else: