Patchwork D8076: worker: Manually buffer reads from pickle stream

login
register
mail settings
Submitter phabricator
Date Feb. 7, 2020, 3:53 p.m.
Message ID <254720410fe7abcf617de35004b92570@localhost.localdomain>
Download mbox | patch
Permalink /patch/45021/
State Not Applicable
Headers show

Comments

phabricator - Feb. 7, 2020, 3:53 p.m.
Closed by commit rHG12491abf93bd: worker: manually buffer reads from pickle stream (authored by heftig).
This revision was automatically updated to reflect the committed changes.
This revision was not accepted when it landed; it landed in state "Needs Review".

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D8076?vs=19881&id=19986

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8076/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8076

AFFECTED FILES
  mercurial/worker.py
  tests/test-worker.t

CHANGE DETAILS




To: heftig, #hg-reviewers, yuja
Cc: yuja, mjpieters, mercurial-devel

Patch

diff --git a/tests/test-worker.t b/tests/test-worker.t
--- a/tests/test-worker.t
+++ b/tests/test-worker.t
@@ -131,4 +131,35 @@ 
   abort: known exception
   [255]
 
+Do not crash on partially read result
+
+  $ cat > $TESTTMP/detecttruncated.py <<EOF
+  > from __future__ import absolute_import
+  > import os
+  > import sys
+  > import time
+  > sys.unraisablehook = lambda x: None
+  > oldwrite = os.write
+  > def splitwrite(fd, string):
+  >     ret = oldwrite(fd, string[:9])
+  >     if ret == 9:
+  >         time.sleep(0.1)
+  >         ret += oldwrite(fd, string[9:])
+  >     return ret
+  > os.write = splitwrite
+  > EOF
+
+  $ hg --config "extensions.t=$abspath" --config worker.numcpus=8 --config \
+  > "extensions.d=$TESTTMP/detecttruncated.py" test 100000.0
+  start
+  run
+  run
+  run
+  run
+  run
+  run
+  run
+  run
+  done
+
 #endif
diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -65,6 +65,41 @@ 
     return min(max(countcpus(), 4), 32)
 
 
+if pycompat.ispy3:
+
+    class _blockingreader(object):
+        def __init__(self, wrapped):
+            self._wrapped = wrapped
+
+        def __getattr__(self, attr):
+            return getattr(self._wrapped, attr)
+
+        # issue multiple reads until size is fulfilled
+        def read(self, size=-1):
+            if size < 0:
+                return self._wrapped.readall()
+
+            buf = bytearray(size)
+            view = memoryview(buf)
+            pos = 0
+
+            while pos < size:
+                ret = self._wrapped.readinto(view[pos:])
+                if not ret:
+                    break
+                pos += ret
+
+            del view
+            del buf[pos:]
+            return buf
+
+
+else:
+
+    def _blockingreader(wrapped):
+        return wrapped
+
+
 if pycompat.isposix or pycompat.iswindows:
     _STARTUP_COST = 0.01
     # The Windows worker is thread based. If tasks are CPU bound, threads
@@ -226,7 +261,7 @@ 
     selector = selectors.DefaultSelector()
     for rfd, wfd in pipes:
         os.close(wfd)
-        selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)
+        selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)
 
     def cleanup():
         signal.signal(signal.SIGINT, oldhandler)
@@ -240,7 +275,7 @@ 
         while openpipes > 0:
             for key, events in selector.select():
                 try:
-                    res = util.pickle.load(key.fileobj)
+                    res = util.pickle.load(_blockingreader(key.fileobj))
                     if hasretval and res[0]:
                         retval.update(res[1])
                     else: