From patchwork Thu Jul 19 12:18:32 2018 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: D3960: worker: use one pipe per posix worker and select() in parent process From: phabricator X-Patchwork-Id: 32910 Message-Id: <4813e33dd8f8c35022ecb86a62a51564@localhost.localdomain> To: mercurial-devel@mercurial-scm.org Date: Thu, 19 Jul 2018 12:18:32 +0000 This revision was automatically updated to reflect the committed changes. Closed by commit rHG9e6afe7fca31: worker: use one pipe per posix worker and select() in parent process (authored by hooper, committed by ). REPOSITORY rHG Mercurial CHANGES SINCE LAST UPDATE https://phab.mercurial-scm.org/D3960?vs=9626&id=9628 REVISION DETAIL https://phab.mercurial-scm.org/D3960 AFFECTED FILES mercurial/worker.py CHANGE DETAILS To: hooper, #hg-reviewers Cc: yuja, mercurial-devel diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -14,6 +14,12 @@ import threading import time +try: + import selectors + selectors.BaseSelector +except ImportError: + from .thirdparty import selectors2 as selectors + from .i18n import _ from . import ( encoding, @@ -89,7 +95,6 @@ return func(*staticargs + (args,)) def _posixworker(ui, func, staticargs, args): - rfd, wfd = os.pipe() workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) @@ -138,7 +143,15 @@ oldchldhandler = signal.signal(signal.SIGCHLD, sigchldhandler) ui.flush() parentpid = os.getpid() + pipes = [] for pargs in partition(args, workers): + # Every worker gets its own pipe to send results on, so we don't have to + # implement atomic writes larger than PIPE_BUF. Each forked process has + # its own pipe's descriptors in the local variables, and the parent + # process has the full list of pipe descriptors (and it doesn't really + # care what order they're in). + rfd, wfd = os.pipe() + pipes.append((rfd, wfd)) # make sure we use os._exit in all worker code paths. otherwise the # worker may do some clean-ups which could cause surprises like # deadlock. see sshpeer.cleanup for example. @@ -154,6 +167,9 @@ signal.signal(signal.SIGCHLD, oldchldhandler) def workerfunc(): + for r, w in pipes[:-1]: + os.close(r) + os.close(w) os.close(rfd) for result in func(*(staticargs + (pargs,))): os.write(wfd, util.pickle.dumps(result)) @@ -175,8 +191,10 @@ finally: os._exit(ret & 255) pids.add(pid) - os.close(wfd) - fp = os.fdopen(rfd, r'rb', 0) + selector = selectors.DefaultSelector() + for rfd, wfd in pipes: + os.close(wfd) + selector.register(os.fdopen(rfd, r'rb', 0), selectors.EVENT_READ) def cleanup(): signal.signal(signal.SIGINT, oldhandler) waitforworkers() @@ -187,15 +205,19 @@ os.kill(os.getpid(), -status) sys.exit(status) try: - while True: - try: - yield util.pickle.load(fp) - except EOFError: - break - except IOError as e: - if e.errno == errno.EINTR: - continue - raise + openpipes = len(pipes) + while openpipes > 0: + for key, events in selector.select(): + try: + yield util.pickle.load(key.fileobj) + except EOFError: + selector.unregister(key.fileobj) + key.fileobj.close() + openpipes -= 1 + except IOError as e: + if e.errno == errno.EINTR: + continue + raise except: # re-raises killworkers() cleanup()