Patchwork [3,of,3] worker: handle worker failures more aggressively

login
register
mail settings
Submitter Bryan O'Sullivan
Date Feb. 19, 2013, 6:29 p.m.
Message ID <42c14cff887e20d033db.1361298555@australite.local>
Download mbox | patch
Permalink /patch/1032/
State Superseded
Headers show

Comments

Bryan O'Sullivan - Feb. 19, 2013, 6:29 p.m.
# HG changeset patch
# User Bryan O'Sullivan <bryano@fb.com>
# Date 1361297550 28800
# Node ID 42c14cff887e20d033dbaa8f8c00100e807a1149
# Parent  9ef52f0a93a0cba939742743ff59e4c2a2463fab
worker: handle worker failures more aggressively

We now wait for worker processes in a separate thread, so that we can
spot failures in a timely way, wihout waiting for the progress pipe
to drain.

If a worker fails, we recover the pre-parallel-update behaviour of
failing early by killing its peers before propagating the failure.
Idan Kamara - Feb. 19, 2013, 7 p.m.
On Tue, Feb 19, 2013 at 8:29 PM, Bryan O'Sullivan <bos@serpentine.com>
wrote:
>
> # HG changeset patch
> # User Bryan O'Sullivan <bryano@fb.com>
> # Date 1361297550 28800
> # Node ID 42c14cff887e20d033dbaa8f8c00100e807a1149
> # Parent  9ef52f0a93a0cba939742743ff59e4c2a2463fab
> worker: handle worker failures more aggressively
>
> We now wait for worker processes in a separate thread, so that we can
> spot failures in a timely way, wihout waiting for the progress pipe
> to drain.
>
> If a worker fails, we recover the pre-parallel-update behaviour of
> failing early by killing its peers before propagating the failure.
>
> diff --git a/mercurial/worker.py b/mercurial/worker.py
> --- a/mercurial/worker.py
> +++ b/mercurial/worker.py
> @@ -6,7 +6,7 @@
>  # GNU General Public License version 2 or any later version.
>
>  from i18n import _
> -import os, signal, sys, util
> +import os, signal, sys, threading, util
>
>  def countcpus():
>      '''try to count the number of CPUs on the system'''
> @@ -77,6 +77,7 @@ def _posixworker(ui, func, staticargs, a
>      workers = _numworkers(ui)
>      oldhandler = signal.getsignal(signal.SIGINT)
>      signal.signal(signal.SIGINT, signal.SIG_IGN)
> +    pids, problem = [], [0]
>      for pargs in partition(args, workers):
>          pid = os.fork()
>          if pid == 0:
> @@ -88,25 +89,40 @@ def _posixworker(ui, func, staticargs, a
>                  os._exit(0)
>              except KeyboardInterrupt:
>                  os._exit(255)
> +        pids.append(pid)
> +    pids.reverse()

Ok, so the last created child will be the first in pids.

>      os.close(wfd)
>      fp = os.fdopen(rfd, 'rb', 0)
> -    def cleanup():
> -        # python 2.4 is too dumb for try/yield/finally
> -        signal.signal(signal.SIGINT, oldhandler)
> -        problem = None
> -        for i in xrange(workers):
> +    def killworkers():
> +        # if one worker bails, there's no good reason to wait for the
> rest
> +        for p in pids:
> +            try:
> +                os.kill(p, signal.SIGTERM)
> +            except OSError, err:
> +                if err.errno != errno.ESRCH:
> +                    raise
> +    def waitforworkers():
> +        for p in pids:
>              pid, st = os.wait()

And here you're waiting for it to finish, but what happens
if for some reason one of the previous children fails first?

Why not use select on the children and also spare the thread?
Bryan O'Sullivan - Feb. 19, 2013, 7:53 p.m.
On Tue, Feb 19, 2013 at 11:00 AM, Idan Kamara <idankk86@gmail.com> wrote:

> And here you're waiting for it to finish, but what happens
> if for some reason one of the previous children fails first?
>

os.wait waits for any process to finish. None of the values in the list of
pids is used in that loop.


> Why not use select on the children and also spare the thread?
>

select works on file descriptors, not pids. We need some mechanism to catch
early-exiting children immediately. We could possibly do that by having a
pipe per child and detect EOF on each pipe, but that's way more work. (It
might turn out to be necessary at some point, e.g. for Windows, but I don't
want to further complicate the code until I know that said complexity is
necessary.)
Idan Kamara - Feb. 19, 2013, 7:56 p.m.
On Tue, Feb 19, 2013 at 9:53 PM, Bryan O'Sullivan <bos@serpentine.com>
wrote:
>
> On Tue, Feb 19, 2013 at 11:00 AM, Idan Kamara <idankk86@gmail.com> wrote:
>>
>> And here you're waiting for it to finish, but what happens
>> if for some reason one of the previous children fails first?
>
> os.wait waits for any process to finish. None of the values in the list of
> pids is used in that loop.

Oh, missed that.

>
>>
>> Why not use select on the children and also spare the thread?
>
> select works on file descriptors, not pids. We need some mechanism to
> catch early-exiting children immediately. We could possibly do that by
> having a pipe per child and detect EOF on each pipe, but that's way more
> work. (It might turn out to be necessary at some point, e.g. for Windows,
> but I don't want to further complicate the code until I know that said
> complexity is necessary.)

Makes sense, thanks for clarifying.
Isaac Jurado - Feb. 19, 2013, 10:23 p.m.
Replying Bryan O'Sullivan:
> # HG changeset patch
> # User Bryan O'Sullivan <bryano@fb.com>
> # Date 1361297550 28800
> # Node ID 42c14cff887e20d033dbaa8f8c00100e807a1149
> # Parent  9ef52f0a93a0cba939742743ff59e4c2a2463fab
> worker: handle worker failures more aggressively
> 
> We now wait for worker processes in a separate thread, so that we can
> spot failures in a timely way, wihout waiting for the progress pipe
> to drain.
> 
> If a worker fails, we recover the pre-parallel-update behaviour of
> failing early by killing its peers before propagating the failure.
> 
> diff --git a/mercurial/worker.py b/mercurial/worker.py
> --- a/mercurial/worker.py
> +++ b/mercurial/worker.py
> @@ -6,7 +6,7 @@
>  # GNU General Public License version 2 or any later version.
>  
>  from i18n import _
> -import os, signal, sys, util
> +import os, signal, sys, threading, util
>  
>  def countcpus():
>      '''try to count the number of CPUs on the system'''
> @@ -77,6 +77,7 @@ def _posixworker(ui, func, staticargs, a
>      workers = _numworkers(ui)
>      oldhandler = signal.getsignal(signal.SIGINT)
>      signal.signal(signal.SIGINT, signal.SIG_IGN)
> +    pids, problem = [], [0]
>      for pargs in partition(args, workers):
>          pid = os.fork()
>          if pid == 0:
> @@ -88,25 +89,40 @@ def _posixworker(ui, func, staticargs, a
>                  os._exit(0)
>              except KeyboardInterrupt:
>                  os._exit(255)
> +        pids.append(pid)
> +    pids.reverse()
>      os.close(wfd)
>      fp = os.fdopen(rfd, 'rb', 0)
> -    def cleanup():
> -        # python 2.4 is too dumb for try/yield/finally
> -        signal.signal(signal.SIGINT, oldhandler)
> -        problem = None
> -        for i in xrange(workers):
> +    def killworkers():
> +        # if one worker bails, there's no good reason to wait for the rest
> +        for p in pids:
> +            try:
> +                os.kill(p, signal.SIGTERM)
> +            except OSError, err:
> +                if err.errno != errno.ESRCH:
> +                    raise
> +    def waitforworkers():
> +        for p in pids:
>              pid, st = os.wait()
>              if st and not problem:
> -                problem = _exitstatus(st)
> -        if problem:
> -            if problem < 0:
> -                os.kill(os.getpid(), -problem)
> -            sys.exit(problem)
> +                problem[0] = _exitstatus(st)

This single element list looks quite strange.  Does it have to do with
working around the closure or the scoping?  Just curious.

> +                killworkers()
> +    t = threading.Thread(target=waitforworkers)
> +    t.start()

Please pardon my ignorance, but the execution flow is starting to get
confusing.  If I understood correctly, the idea is to kill the other
children whenever one fails, right?

What about putting all workers in the same process group, but different
to the parent?  Something like:

    workers = _numworkers(ui)
    pgid = 0
    for pargs in partition(args, workers):
        pid = os.fork()
        if pid == 0:
            try:
                os.setpgid(0, pgid)
                os.close(rfd)
                for i, item in func(...):
                    os.write(...)
                os._exit(0)
            except KeyboardInterrupt:
                os.kill(0, signal.SIGTERM)  # I can kill my siblings
                os._exit(255)
        elif not pgid:
            # Place the rest of the children in the same group as the
            # first
            pgid = pid

I hope it helps.  Regards.
Bryan O'Sullivan - Feb. 19, 2013, 10:29 p.m.
On Tue, Feb 19, 2013 at 2:23 PM, Isaac Jurado <diptongo@gmail.com> wrote:

> > +                problem[0] = _exitstatus(st)
>
> This single element list looks quite strange.  Does it have to do with
> working around the closure or the scoping?


Welcome to Python, where we can't have nice lexically scoped things.

> +    t = threading.Thread(target=waitforworkers)
> > +    t.start()
>
> Please pardon my ignorance, but the execution flow is starting to get
> confusing.  If I understood correctly, the idea is to kill the other
> children whenever one fails, right?
>

Yes.


> What about putting all workers in the same process group, but different
> to the parent?


Process groups only exist on Unix. This code should (mostly?) work
untouched on other platforms.
Isaac Jurado - Feb. 19, 2013, 10:59 p.m.
Replying Bryan O'Sullivan:
>
>> +    t = threading.Thread(target=waitforworkers)
>> +    t.start()
>>
>> Please pardon my ignorance, but the execution flow is starting to get
>> confusing.  If I understood correctly, the idea is to kill the other
>> children whenever one fails, right?
>>
>
> Yes.
>
>> What about putting all workers in the same process group, but
>> different to the parent?
>
> Process groups only exist on Unix. This code should (mostly?) work
> untouched on other platforms.

Oh, I didn't catch that.  As of the current code, your other emails and
the name _posixworker() I assumed, probably wrong, that the Windows
implementation would have its own idiosyncrasies.

Therefore, I was already aware that process groups only exist on Unix.
But so does os.fork().  Am I missing something?

Cheers.
Bryan O'Sullivan - Feb. 19, 2013, 11:10 p.m.
On Tue, Feb 19, 2013 at 2:59 PM, Isaac Jurado <diptongo@gmail.com> wrote:

> Oh, I didn't catch that.  As of the current code, your other emails and
> the name _posixworker() I assumed, probably wrong, that the Windows
> implementation would have its own idiosyncrasies.
>

Yes, but I'm trying to keep as much of the code portable as reasonable, so
that if someone ports the worker code to Windows, it's minimally invasive.
Isaac Jurado - Feb. 20, 2013, 8:20 a.m.
On Wed, Feb 20, 2013 at 12:10 AM, Bryan O'Sullivan <bos@serpentine.com> wrote:
> On Tue, Feb 19, 2013 at 2:59 PM, Isaac Jurado <diptongo@gmail.com> wrote:
>>
>> Oh, I didn't catch that.  As of the current code, your other emails and
>> the name _posixworker() I assumed, probably wrong, that the Windows
>> implementation would have its own idiosyncrasies.
>
>
> Yes, but I'm trying to keep as much of the code portable as reasonable, so
> that if someone ports the worker code to Windows, it's minimally invasive.

Allright.  Then forgive me for insisting on this one more time:

What's the plan for Windows?  Threads or processes?
Bryan O'Sullivan - Feb. 20, 2013, 6:59 p.m.
On Wed, Feb 20, 2013 at 12:20 AM, Isaac Jurado <diptongo@gmail.com> wrote:

> What's the plan for Windows?  Threads or processes?
>

Threads won't help, because update is partly CPU bound and threads will all
serialize on the GIL.

So if there's to be an answer for Windows, it has to be processes,
presumably started via os.spawnv.

I'm in no rush to implement a Windows solution. I don't expect an approach
based on the multiprocessing module to be useful, because (a) people do all
kinds of perverse packaging things on Windows that are unlikely to work
with multiprocessing and (b) the multiprocessing module adds a ton of
overhead, so you have to do a lot of work to overcome its communication
costs.

(We can't use any of the various "Windows has something sort of like
fork(2)" approaches either, because they are all terribly broken.)
Isaac Jurado - Feb. 21, 2013, 8:22 a.m.
On Wed, Feb 20, 2013 at 7:59 PM, Bryan O'Sullivan <bos@serpentine.com> wrote:
> On Wed, Feb 20, 2013 at 12:20 AM, Isaac Jurado <diptongo@gmail.com> wrote:
>>
>> What's the plan for Windows?  Threads or processes?
>
> Threads won't help, because update is partly CPU bound and threads
> will all serialize on the GIL.
>
> So if there's to be an answer for Windows, it has to be processes,
> presumably started via os.spawnv.
>
> I'm in no rush to implement a Windows solution. I don't expect an
> approach based on the multiprocessing module to be useful, because (a)
> people do all kinds of perverse packaging things on Windows that are
> unlikely to work with multiprocessing and (b) the multiprocessing
> module adds a ton of overhead, so you have to do a lot of work to
> overcome its communication costs.
>
> (We can't use any of the various "Windows has something sort of like
> fork(2)" approaches either, because they are all terribly broken.)

That's my point.  Working with processes on Windows is going to be
different enough from the other platforms, right?

And let me quote your earlier response:

> Process groups only exist on Unix. This code should (mostly?) work
> untouched on other platforms.

Don't you see a bit of contradiction here?  Please, don't get me wrong,
maybe I'm just failing to accept that my process group suggestion is not
as smart as I think it was.  But you have to understand that the given
reasoning about discarding Unix-only features is, at best, quite
confusing.  Specially when talking about the process model.

My apologies for being a bit of an ass, but I'm very interested in how
these patches evolve.

Regards.
Bryan O'Sullivan - Feb. 21, 2013, 6:09 p.m.
On Thu, Feb 21, 2013 at 12:22 AM, Isaac Jurado <diptongo@gmail.com> wrote:

> That's my point.  Working with processes on Windows is going to be
> different enough from the other platforms, right?
>

Okay, I'm confused. Are you looking for me to do something, or just passing
the time?
Isaac Jurado - Feb. 21, 2013, 9:58 p.m.
Replying Bryan O'Sullivan:
> On Thu, Feb 21, 2013 at 12:22 AM, Isaac Jurado <diptongo@gmail.com> wrote:
>
>> That's my point.  Working with processes on Windows is going to be
>> different enough from the other platforms, right?
>
> Okay, I'm confused. Are you looking for me to do something, or just
> passing the time?

Last time I reply on this, I promise.  Let me summarize what, from my
point of view, has happened.

  - You submitted a patch that uses a monitor thread on the parent
    process in order to detect a children failure early and finish the
    rest of the children.

  - I suggested to leave that task to the children processes by putting
    all of them in the same process group and "broadcasting" a signal on
    failure.  Which could circumvent the need for threading the parent
    process.

  - You replied that process groups are Unix only and you wanted to
    maximize code portability.

  - I was confused, since os.fork() and atomic writes to pipes are also
    Unix only (POSIX in particular).  So I tried to ask for a better
    explanation, as politely as possible.  Specially since mailing list
    discussions tend to heat up very quickly and, between you and me,
    you are the authority.  I don't want any trouble.

  - After a pair of replies, you told me, more or less, that Windows is
    a whole different story.  Statement that I found very contradictory
    from the first argument rejecting the use of process groups.

Therefore, when, in theroy, I was trying to understand the resasonings behind some
design decisions, in practice I was unwillingly passing the time.

As I said, there's no point in taking this discussion further.  Whatever
you write will be fine, even if apparently contradictory and if I have
any complaint, I'll send a patch.

Sorry for the confusion.  Best regards.
Bryan O'Sullivan - Feb. 21, 2013, 10:05 p.m.
On Thu, Feb 21, 2013 at 1:58 PM, Isaac Jurado <diptongo@gmail.com> wrote:

>   - I was confused, since os.fork() and atomic writes to pipes are also
>     Unix only (POSIX in particular).


I see. Thanks for explaining what you were looking for.

So, insofar as there's a Windows plan (and my hope is that someone else
will pick up the baton there and figure it out), it is not very interesting
yet: when developing the Windows code, refactor the process startup code in
the worker module so that the Unix-specific stuff is more self-contained. I
didn't do that early on because I don't have Windows code sitting around to
suggest what shape the portability stuff should have. That's all.

The process killing code is already portable as far as I can tell, so based
on my current understanding, it can stay as it is.

By the way, I think that small writes to pipes may possibly be atomic on
Windows, although the WriteFile documentation doesn't state this clearly
anywhere.

Patch

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -6,7 +6,7 @@ 
 # GNU General Public License version 2 or any later version.
 
 from i18n import _
-import os, signal, sys, util
+import os, signal, sys, threading, util
 
 def countcpus():
     '''try to count the number of CPUs on the system'''
@@ -77,6 +77,7 @@  def _posixworker(ui, func, staticargs, a
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
+    pids, problem = [], [0]
     for pargs in partition(args, workers):
         pid = os.fork()
         if pid == 0:
@@ -88,25 +89,40 @@  def _posixworker(ui, func, staticargs, a
                 os._exit(0)
             except KeyboardInterrupt:
                 os._exit(255)
+        pids.append(pid)
+    pids.reverse()
     os.close(wfd)
     fp = os.fdopen(rfd, 'rb', 0)
-    def cleanup():
-        # python 2.4 is too dumb for try/yield/finally
-        signal.signal(signal.SIGINT, oldhandler)
-        problem = None
-        for i in xrange(workers):
+    def killworkers():
+        # if one worker bails, there's no good reason to wait for the rest
+        for p in pids:
+            try:
+                os.kill(p, signal.SIGTERM)
+            except OSError, err:
+                if err.errno != errno.ESRCH:
+                    raise
+    def waitforworkers():
+        for p in pids:
             pid, st = os.wait()
             if st and not problem:
-                problem = _exitstatus(st)
-        if problem:
-            if problem < 0:
-                os.kill(os.getpid(), -problem)
-            sys.exit(problem)
+                problem[0] = _exitstatus(st)
+                killworkers()
+    t = threading.Thread(target=waitforworkers)
+    t.start()
+    def cleanup():
+        signal.signal(signal.SIGINT, oldhandler)
+        t.join()
+        status = problem[0]
+        if status:
+            if status < 0:
+                os.kill(os.getpid(), -status)
+            sys.exit(status)
     try:
         for line in fp:
             l = line.split(' ', 1)
             yield int(l[0]), l[1][:-1]
     except: # re-raises
+        killworkers()
         cleanup()
         raise
     cleanup()