Patchwork D3960: worker: use one pipe per posix worker and select() in parent process

login
register
mail settings
Submitter phabricator
Date July 19, 2018, 12:18 p.m.
Message ID <4813e33dd8f8c35022ecb86a62a51564@localhost.localdomain>
Download mbox | patch
Permalink /patch/32910/
State Not Applicable
Headers show

Comments

phabricator - July 19, 2018, 12:18 p.m.
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG9e6afe7fca31: worker: use one pipe per posix worker and select() in parent process (authored by hooper, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D3960?vs=9626&id=9628

REVISION DETAIL
  https://phab.mercurial-scm.org/D3960

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS




To: hooper, #hg-reviewers
Cc: yuja, mercurial-devel

Patch

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -14,6 +14,12 @@ 
 import threading
 import time
 
+try:
+    import selectors
+    selectors.BaseSelector
+except ImportError:
+    from .thirdparty import selectors2 as selectors
+
 from .i18n import _
 from . import (
     encoding,
@@ -89,7 +95,6 @@ 
     return func(*staticargs + (args,))
 
 def _posixworker(ui, func, staticargs, args):
-    rfd, wfd = os.pipe()
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
@@ -138,7 +143,15 @@ 
     oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler)
     ui.flush()
     parentpid = os.getpid()
+    pipes = []
     for pargs in partition(args, workers):
+        # Every worker gets its own pipe to send results on, so we don't have to
+        # implement atomic writes larger than PIPE_BUF. Each forked process has
+        # its own pipe's descriptors in the local variables, and the parent
+        # process has the full list of pipe descriptors (and it doesn't really
+        # care what order they're in).
+        rfd, wfd = os.pipe()
+        pipes.append((rfd, wfd))
         # make sure we use os._exit in all worker code paths. otherwise the
         # worker may do some clean-ups which could cause surprises like
         # deadlock. see sshpeer.cleanup for example.
@@ -154,6 +167,9 @@ 
                 signal.signal(signal.SIGCHLD, oldchldhandler)
 
                 def workerfunc():
+                    for r, w in pipes[:-1]:
+                        os.close(r)
+                        os.close(w)
                     os.close(rfd)
                     for result in func(*(staticargs + (pargs,))):
                         os.write(wfd, util.pickle.dumps(result))
@@ -175,8 +191,10 @@ 
                 finally:
                     os._exit(ret & 255)
         pids.add(pid)
-    os.close(wfd)
-    fp = os.fdopen(rfd, r'rb', 0)
+    selector = selectors.DefaultSelector()
+    for rfd, wfd in pipes:
+        os.close(wfd)
+        selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ)
     def cleanup():
         signal.signal(signal.SIGINT, oldhandler)
         waitforworkers()
@@ -187,15 +205,19 @@ 
                 os.kill(os.getpid(), -status)
             sys.exit(status)
     try:
-        while True:
-            try:
-                yield util.pickle.load(fp)
-            except EOFError:
-                break
-            except IOError as e:
-                if e.errno == errno.EINTR:
-                    continue
-                raise
+        openpipes = len(pipes)
+        while openpipes > 0:
+            for key, events in selector.select():
+                try:
+                    yield util.pickle.load(key.fileobj)
+                except EOFError:
+                    selector.unregister(key.fileobj)
+                    key.fileobj.close()
+                    openpipes -= 1
+                except IOError as e:
+                    if e.errno == errno.EINTR:
+                        continue
+                    raise
     except: # re-raises
         killworkers()
         cleanup()