Patchwork [09,of,11] worker: allow a function to be run in multiple worker processes

login
register
mail settings
Submitter Bryan O'Sullivan
Date Feb. 9, 2013, 2:06 p.m.
Message ID <dfb4e4bfedfc6db5fddf.1360418809@australite.local>
Download mbox | patch
Permalink /patch/876/
State Accepted
Commit 047110c0e2a8371b54e55a051b5e4f6753eca90d
Headers show

Comments

Bryan O'Sullivan - Feb. 9, 2013, 2:06 p.m.
# HG changeset patch
# User Bryan O'Sullivan <bryano@fb.com>
# Date 1360418465 0
# Node ID dfb4e4bfedfc6db5fddf08b95276de01f1b1366d
# Parent  023956ed1b098df7f93a5c1857b2f5ec00f3e45d
worker: allow a function to be run in multiple worker processes

If we estimate that it will be worth the cost, we run the function in
multiple processes. Otherwise, we run it in-process.

Children report progress to the parent through a pipe.

Not yet implemented on Windows.
Isaac Jurado - Feb. 9, 2013, 3:32 p.m.
Replying Bryan O'Sullivan:
> # HG changeset patch
> # User Bryan O'Sullivan <bryano@fb.com>
> # Date 1360418465 0
> # Node ID dfb4e4bfedfc6db5fddf08b95276de01f1b1366d
> # Parent  023956ed1b098df7f93a5c1857b2f5ec00f3e45d
> worker: allow a function to be run in multiple worker processes
> 
> If we estimate that it will be worth the cost, we run the function in
> multiple processes. Otherwise, we run it in-process.
> 
> Children report progress to the parent through a pipe.
> 
> Not yet implemented on Windows.
> 
> diff --git a/mercurial/worker.py b/mercurial/worker.py
> --- a/mercurial/worker.py
> +++ b/mercurial/worker.py
> @@ -5,7 +5,7 @@
>  # This software may be used and distributed according to the terms of the
>  # GNU General Public License version 2 or any later version.
>  
> -import os
> +import os, signal, sys
>  
>  def countcpus():
>      '''try to count the number of CPUs on the system'''
> @@ -54,3 +54,58 @@ def worthwhile(costperop, nops):
>      linear = costperop * nops
>      benefit = linear - (_startupcost * _numworkers + linear / _numworkers)
>      return benefit >= 0.15
> +
> +def worker(costperarg, func, staticargs, args):
> +    '''run a function, possibly in parallel in multiple worker
> +    processes.
> +
> +    returns a progress iterator
> +
> +    costperarg - cost of a single task
> +
> +    func - function to run
> +
> +    staticargs - arguments to pass to every invocation of the function
> +
> +    args - arguments to split into chunks, to pass to individual
> +    workers
> +    '''
> +    if worthwhile(costperarg, len(args)):
> +        return _platformworker(func, staticargs, args)
> +    return func(*staticargs + (args,))
> +
> +def _posixworker(func, staticargs, args):
> +    rfd, wfd = os.pipe()
> +    for pargs in partition(args, _numworkers):
> +        pid = os.fork()
> +        if pid == 0:
> +            try:
> +                os.close(rfd)
> +                for i, item in func(*(staticargs + (pargs,))):
> +                    os.write(wfd, '%d %s\n' % (i, item))

Cool trick!  Taking advantage of atomic pipe writes :-)

> +                os._exit(0)
> +            except KeyboardInterrupt:
> +                os._exit(255)
> +    os.close(wfd)
> +    fp = os.fdopen(rfd, 'rb', 0)
> +    oldhandler = signal.getsignal(signal.SIGINT)
> +    signal.signal(signal.SIGINT, signal.SIG_IGN)
> +    def cleanup():
> +        # python 2.4 is too dumb for try/yield/finally
> +        signal.signal(signal.SIGINT, oldhandler)
> +        problems = 0
> +        for i in xrange(_numworkers):
> +            problems |= os.wait()[1]
> +        if problems:
> +            sys.exit(1)
> +    try:
> +        for line in fp:
> +            l = line.split(' ', 1)
> +            yield int(l[0]), l[1][:-1]
> +    except: # re-raises
> +        cleanup()
> +        raise
> +    cleanup()
> +
> +if os.name != 'nt':
> +    _platformworker = _posixworker
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@selenic.com
> http://selenic.com/mailman/listinfo/mercurial-devel
Idan Kamara - Feb. 9, 2013, 3:35 p.m.
On Sat, Feb 9, 2013 at 4:06 PM, Bryan O'Sullivan <bos@serpentine.com> wrote:
>
> # HG changeset patch
> # User Bryan O'Sullivan <bryano@fb.com>
> # Date 1360418465 0
> # Node ID dfb4e4bfedfc6db5fddf08b95276de01f1b1366d
> # Parent  023956ed1b098df7f93a5c1857b2f5ec00f3e45d
> worker: allow a function to be run in multiple worker processes
>
> If we estimate that it will be worth the cost, we run the function in
> multiple processes. Otherwise, we run it in-process.
>
> Children report progress to the parent through a pipe.
>
> Not yet implemented on Windows.
>
> diff --git a/mercurial/worker.py b/mercurial/worker.py
> --- a/mercurial/worker.py
> +++ b/mercurial/worker.py
> @@ -5,7 +5,7 @@
>  # This software may be used and distributed according to the terms of the
>  # GNU General Public License version 2 or any later version.
>
> -import os
> +import os, signal, sys
>
>  def countcpus():
>      '''try to count the number of CPUs on the system'''
> @@ -54,3 +54,58 @@ def worthwhile(costperop, nops):
>      linear = costperop * nops
>      benefit = linear - (_startupcost * _numworkers + linear /
> _numworkers)
>      return benefit >= 0.15
> +
> +def worker(costperarg, func, staticargs, args):
> +    '''run a function, possibly in parallel in multiple worker
> +    processes.
> +
> +    returns a progress iterator
> +
> +    costperarg - cost of a single task
> +
> +    func - function to run
> +
> +    staticargs - arguments to pass to every invocation of the function
> +
> +    args - arguments to split into chunks, to pass to individual
> +    workers
> +    '''
> +    if worthwhile(costperarg, len(args)):
> +        return _platformworker(func, staticargs, args)
> +    return func(*staticargs + (args,))
> +
> +def _posixworker(func, staticargs, args):
> +    rfd, wfd = os.pipe()
> +    for pargs in partition(args, _numworkers):
> +        pid = os.fork()
> +        if pid == 0:
> +            try:
> +                os.close(rfd)
> +                for i, item in func(*(staticargs + (pargs,))):
> +                    os.write(wfd, '%d %s\n' % (i, item))
> +                os._exit(0)
> +            except KeyboardInterrupt:
> +                os._exit(255)

Isn't it a problem you're not exiting here for other exceptions
too, assuming this code isn't run by 'hg' (e.g. things that
import mercurial, or the command server)?

> +    os.close(wfd)
> +    fp = os.fdopen(rfd, 'rb', 0)
> +    oldhandler = signal.getsignal(signal.SIGINT)
> +    signal.signal(signal.SIGINT, signal.SIG_IGN)
> +    def cleanup():
> +        # python 2.4 is too dumb for try/yield/finally
> +        signal.signal(signal.SIGINT, oldhandler)
> +        problems = 0
> +        for i in xrange(_numworkers):
> +            problems |= os.wait()[1]
> +        if problems:
> +            sys.exit(1)

This exit is also going to be a problem for things that
call into the internal API.

And doesn't it also lose the cause of the error?
Isaac Jurado - Feb. 9, 2013, 3:39 p.m.
Replying Bryan O'Sullivan:
>
> +    def cleanup():
> +        # python 2.4 is too dumb for try/yield/finally
> +        signal.signal(signal.SIGINT, oldhandler)
> +        problems = 0
> +        for i in xrange(_numworkers):
> +            problems |= os.wait()[1]
> +        if problems:
> +            sys.exit(1)

A curiosity.  Wouldn't this terminate the command server too?  Maybe
it's too soon to ask.

Cheeres.
Bryan O'Sullivan - Feb. 9, 2013, 5:23 p.m.
On Sat, Feb 9, 2013 at 7:35 AM, Idan Kamara <idankk86@gmail.com> wrote:

> This exit is also going to be a problem for things that
> call into the internal API.
>

The master process will only exit if one of the workers exits abnormally.
That should only occur if the worker crashes, in which case we would have
crashed before anyway.
Idan Kamara - Feb. 9, 2013, 5:33 p.m.
On Sat, Feb 9, 2013 at 7:23 PM, Bryan O'Sullivan <bos@serpentine.com> wrote:
>
> On Sat, Feb 9, 2013 at 7:35 AM, Idan Kamara <idankk86@gmail.com> wrote:
>>
>> This exit is also going to be a problem for things that
>> call into the internal API.
>
>
> The master process will only exit if one of the workers exits abnormally.
> That should only occur if the worker crashes, in which case we would have
> crashed before anyway.

An exception would be raised and caught at dispatch, converted to an error
exit code and returned by dispatch.dispatch().

$ grep -r 'sys\.exit' mercurial/
mercurial/sshserver.py:99:        sys.exit(0)
mercurial/keepalive.py:741:        sys.exit()
mercurial/dispatch.py:28:    sys.exit((dispatch(request(sys.argv[1:]))
or 0) & 255)
mercurial/dispatch.py:202:        # Commands shouldn't sys.exit
directly, but give a return code.
mercurial/lsprof.py:105:        sys.exit(2)

The one at dispatch.py:28 isn't invoked by callers to the internal API.
Bryan O'Sullivan - Feb. 9, 2013, 9:57 p.m.
On Sat, Feb 9, 2013 at 9:33 AM, Idan Kamara <idankk86@gmail.com> wrote:

> An exception would be raised and caught at dispatch, converted to an error
> exit code and returned by dispatch.dispatch().
>

Only it doesn't have any cases for "I want to exit silently" except for
when code calls sys.exit ... which is what I already do.

Remember, in this case, the worker process that has crashed has already
printed its abort or crash message. There's no advantage to the master
printing more stuff.

Patch

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -5,7 +5,7 @@ 
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
-import os
+import os, signal, sys
 
 def countcpus():
     '''try to count the number of CPUs on the system'''
@@ -54,3 +54,58 @@  def worthwhile(costperop, nops):
     linear = costperop * nops
     benefit = linear - (_startupcost * _numworkers + linear / _numworkers)
     return benefit >= 0.15
+
+def worker(costperarg, func, staticargs, args):
+    '''run a function, possibly in parallel in multiple worker
+    processes.
+
+    returns a progress iterator
+
+    costperarg - cost of a single task
+
+    func - function to run
+
+    staticargs - arguments to pass to every invocation of the function
+
+    args - arguments to split into chunks, to pass to individual
+    workers
+    '''
+    if worthwhile(costperarg, len(args)):
+        return _platformworker(func, staticargs, args)
+    return func(*staticargs + (args,))
+
+def _posixworker(func, staticargs, args):
+    rfd, wfd = os.pipe()
+    for pargs in partition(args, _numworkers):
+        pid = os.fork()
+        if pid == 0:
+            try:
+                os.close(rfd)
+                for i, item in func(*(staticargs + (pargs,))):
+                    os.write(wfd, '%d %s\n' % (i, item))
+                os._exit(0)
+            except KeyboardInterrupt:
+                os._exit(255)
+    os.close(wfd)
+    fp = os.fdopen(rfd, 'rb', 0)
+    oldhandler = signal.getsignal(signal.SIGINT)
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
+    def cleanup():
+        # python 2.4 is too dumb for try/yield/finally
+        signal.signal(signal.SIGINT, oldhandler)
+        problems = 0
+        for i in xrange(_numworkers):
+            problems |= os.wait()[1]
+        if problems:
+            sys.exit(1)
+    try:
+        for line in fp:
+            l = line.split(' ', 1)
+            yield int(l[0]), l[1][:-1]
+    except: # re-raises
+        cleanup()
+        raise
+    cleanup()
+
+if os.name != 'nt':
+    _platformworker = _posixworker