Submitter | Bryan O'Sullivan |
---|---|
Date | Feb. 19, 2013, 6:29 p.m. |
Message ID | <42c14cff887e20d033db.1361298555@australite.local> |
Download | mbox | patch |
Permalink | /patch/1032/ |
State | Superseded |
Headers | show |
Comments
On Tue, Feb 19, 2013 at 8:29 PM, Bryan O'Sullivan <bos@serpentine.com> wrote: > > # HG changeset patch > # User Bryan O'Sullivan <bryano@fb.com> > # Date 1361297550 28800 > # Node ID 42c14cff887e20d033dbaa8f8c00100e807a1149 > # Parent 9ef52f0a93a0cba939742743ff59e4c2a2463fab > worker: handle worker failures more aggressively > > We now wait for worker processes in a separate thread, so that we can > spot failures in a timely way, wihout waiting for the progress pipe > to drain. > > If a worker fails, we recover the pre-parallel-update behaviour of > failing early by killing its peers before propagating the failure. > > diff --git a/mercurial/worker.py b/mercurial/worker.py > --- a/mercurial/worker.py > +++ b/mercurial/worker.py > @@ -6,7 +6,7 @@ > # GNU General Public License version 2 or any later version. > > from i18n import _ > -import os, signal, sys, util > +import os, signal, sys, threading, util > > def countcpus(): > '''try to count the number of CPUs on the system''' > @@ -77,6 +77,7 @@ def _posixworker(ui, func, staticargs, a > workers = _numworkers(ui) > oldhandler = signal.getsignal(signal.SIGINT) > signal.signal(signal.SIGINT, signal.SIG_IGN) > + pids, problem = [], [0] > for pargs in partition(args, workers): > pid = os.fork() > if pid == 0: > @@ -88,25 +89,40 @@ def _posixworker(ui, func, staticargs, a > os._exit(0) > except KeyboardInterrupt: > os._exit(255) > + pids.append(pid) > + pids.reverse() Ok, so the last created child will be the first in pids. > os.close(wfd) > fp = os.fdopen(rfd, 'rb', 0) > - def cleanup(): > - # python 2.4 is too dumb for try/yield/finally > - signal.signal(signal.SIGINT, oldhandler) > - problem = None > - for i in xrange(workers): > + def killworkers(): > + # if one worker bails, there's no good reason to wait for the > rest > + for p in pids: > + try: > + os.kill(p, signal.SIGTERM) > + except OSError, err: > + if err.errno != errno.ESRCH: > + raise > + def waitforworkers(): > + for p in pids: > pid, st = os.wait() And here you're waiting for it to finish, but what happens if for some reason one of the previous children fails first? Why not use select on the children and also spare the thread?
On Tue, Feb 19, 2013 at 11:00 AM, Idan Kamara <idankk86@gmail.com> wrote: > And here you're waiting for it to finish, but what happens > if for some reason one of the previous children fails first? > os.wait waits for any process to finish. None of the values in the list of pids is used in that loop. > Why not use select on the children and also spare the thread? > select works on file descriptors, not pids. We need some mechanism to catch early-exiting children immediately. We could possibly do that by having a pipe per child and detect EOF on each pipe, but that's way more work. (It might turn out to be necessary at some point, e.g. for Windows, but I don't want to further complicate the code until I know that said complexity is necessary.)
On Tue, Feb 19, 2013 at 9:53 PM, Bryan O'Sullivan <bos@serpentine.com> wrote: > > On Tue, Feb 19, 2013 at 11:00 AM, Idan Kamara <idankk86@gmail.com> wrote: >> >> And here you're waiting for it to finish, but what happens >> if for some reason one of the previous children fails first? > > os.wait waits for any process to finish. None of the values in the list of > pids is used in that loop. Oh, missed that. > >> >> Why not use select on the children and also spare the thread? > > select works on file descriptors, not pids. We need some mechanism to > catch early-exiting children immediately. We could possibly do that by > having a pipe per child and detect EOF on each pipe, but that's way more > work. (It might turn out to be necessary at some point, e.g. for Windows, > but I don't want to further complicate the code until I know that said > complexity is necessary.) Makes sense, thanks for clarifying.
Replying Bryan O'Sullivan: > # HG changeset patch > # User Bryan O'Sullivan <bryano@fb.com> > # Date 1361297550 28800 > # Node ID 42c14cff887e20d033dbaa8f8c00100e807a1149 > # Parent 9ef52f0a93a0cba939742743ff59e4c2a2463fab > worker: handle worker failures more aggressively > > We now wait for worker processes in a separate thread, so that we can > spot failures in a timely way, wihout waiting for the progress pipe > to drain. > > If a worker fails, we recover the pre-parallel-update behaviour of > failing early by killing its peers before propagating the failure. > > diff --git a/mercurial/worker.py b/mercurial/worker.py > --- a/mercurial/worker.py > +++ b/mercurial/worker.py > @@ -6,7 +6,7 @@ > # GNU General Public License version 2 or any later version. > > from i18n import _ > -import os, signal, sys, util > +import os, signal, sys, threading, util > > def countcpus(): > '''try to count the number of CPUs on the system''' > @@ -77,6 +77,7 @@ def _posixworker(ui, func, staticargs, a > workers = _numworkers(ui) > oldhandler = signal.getsignal(signal.SIGINT) > signal.signal(signal.SIGINT, signal.SIG_IGN) > + pids, problem = [], [0] > for pargs in partition(args, workers): > pid = os.fork() > if pid == 0: > @@ -88,25 +89,40 @@ def _posixworker(ui, func, staticargs, a > os._exit(0) > except KeyboardInterrupt: > os._exit(255) > + pids.append(pid) > + pids.reverse() > os.close(wfd) > fp = os.fdopen(rfd, 'rb', 0) > - def cleanup(): > - # python 2.4 is too dumb for try/yield/finally > - signal.signal(signal.SIGINT, oldhandler) > - problem = None > - for i in xrange(workers): > + def killworkers(): > + # if one worker bails, there's no good reason to wait for the rest > + for p in pids: > + try: > + os.kill(p, signal.SIGTERM) > + except OSError, err: > + if err.errno != errno.ESRCH: > + raise > + def waitforworkers(): > + for p in pids: > pid, st = os.wait() > if st and not problem: > - problem = _exitstatus(st) > - if problem: > - if problem < 0: > - os.kill(os.getpid(), -problem) > - sys.exit(problem) > + problem[0] = _exitstatus(st) This single element list looks quite strange. Does it have to do with working around the closure or the scoping? Just curious. > + killworkers() > + t = threading.Thread(target=waitforworkers) > + t.start() Please pardon my ignorance, but the execution flow is starting to get confusing. If I understood correctly, the idea is to kill the other children whenever one fails, right? What about putting all workers in the same process group, but different to the parent? Something like: workers = _numworkers(ui) pgid = 0 for pargs in partition(args, workers): pid = os.fork() if pid == 0: try: os.setpgid(0, pgid) os.close(rfd) for i, item in func(...): os.write(...) os._exit(0) except KeyboardInterrupt: os.kill(0, signal.SIGTERM) # I can kill my siblings os._exit(255) elif not pgid: # Place the rest of the children in the same group as the # first pgid = pid I hope it helps. Regards.
On Tue, Feb 19, 2013 at 2:23 PM, Isaac Jurado <diptongo@gmail.com> wrote: > > + problem[0] = _exitstatus(st) > > This single element list looks quite strange. Does it have to do with > working around the closure or the scoping? Welcome to Python, where we can't have nice lexically scoped things. > + t = threading.Thread(target=waitforworkers) > > + t.start() > > Please pardon my ignorance, but the execution flow is starting to get > confusing. If I understood correctly, the idea is to kill the other > children whenever one fails, right? > Yes. > What about putting all workers in the same process group, but different > to the parent? Process groups only exist on Unix. This code should (mostly?) work untouched on other platforms.
Replying Bryan O'Sullivan: > >> + t = threading.Thread(target=waitforworkers) >> + t.start() >> >> Please pardon my ignorance, but the execution flow is starting to get >> confusing. If I understood correctly, the idea is to kill the other >> children whenever one fails, right? >> > > Yes. > >> What about putting all workers in the same process group, but >> different to the parent? > > Process groups only exist on Unix. This code should (mostly?) work > untouched on other platforms. Oh, I didn't catch that. As of the current code, your other emails and the name _posixworker() I assumed, probably wrong, that the Windows implementation would have its own idiosyncrasies. Therefore, I was already aware that process groups only exist on Unix. But so does os.fork(). Am I missing something? Cheers.
On Tue, Feb 19, 2013 at 2:59 PM, Isaac Jurado <diptongo@gmail.com> wrote: > Oh, I didn't catch that. As of the current code, your other emails and > the name _posixworker() I assumed, probably wrong, that the Windows > implementation would have its own idiosyncrasies. > Yes, but I'm trying to keep as much of the code portable as reasonable, so that if someone ports the worker code to Windows, it's minimally invasive.
On Wed, Feb 20, 2013 at 12:10 AM, Bryan O'Sullivan <bos@serpentine.com> wrote: > On Tue, Feb 19, 2013 at 2:59 PM, Isaac Jurado <diptongo@gmail.com> wrote: >> >> Oh, I didn't catch that. As of the current code, your other emails and >> the name _posixworker() I assumed, probably wrong, that the Windows >> implementation would have its own idiosyncrasies. > > > Yes, but I'm trying to keep as much of the code portable as reasonable, so > that if someone ports the worker code to Windows, it's minimally invasive. Allright. Then forgive me for insisting on this one more time: What's the plan for Windows? Threads or processes?
On Wed, Feb 20, 2013 at 12:20 AM, Isaac Jurado <diptongo@gmail.com> wrote: > What's the plan for Windows? Threads or processes? > Threads won't help, because update is partly CPU bound and threads will all serialize on the GIL. So if there's to be an answer for Windows, it has to be processes, presumably started via os.spawnv. I'm in no rush to implement a Windows solution. I don't expect an approach based on the multiprocessing module to be useful, because (a) people do all kinds of perverse packaging things on Windows that are unlikely to work with multiprocessing and (b) the multiprocessing module adds a ton of overhead, so you have to do a lot of work to overcome its communication costs. (We can't use any of the various "Windows has something sort of like fork(2)" approaches either, because they are all terribly broken.)
On Wed, Feb 20, 2013 at 7:59 PM, Bryan O'Sullivan <bos@serpentine.com> wrote: > On Wed, Feb 20, 2013 at 12:20 AM, Isaac Jurado <diptongo@gmail.com> wrote: >> >> What's the plan for Windows? Threads or processes? > > Threads won't help, because update is partly CPU bound and threads > will all serialize on the GIL. > > So if there's to be an answer for Windows, it has to be processes, > presumably started via os.spawnv. > > I'm in no rush to implement a Windows solution. I don't expect an > approach based on the multiprocessing module to be useful, because (a) > people do all kinds of perverse packaging things on Windows that are > unlikely to work with multiprocessing and (b) the multiprocessing > module adds a ton of overhead, so you have to do a lot of work to > overcome its communication costs. > > (We can't use any of the various "Windows has something sort of like > fork(2)" approaches either, because they are all terribly broken.) That's my point. Working with processes on Windows is going to be different enough from the other platforms, right? And let me quote your earlier response: > Process groups only exist on Unix. This code should (mostly?) work > untouched on other platforms. Don't you see a bit of contradiction here? Please, don't get me wrong, maybe I'm just failing to accept that my process group suggestion is not as smart as I think it was. But you have to understand that the given reasoning about discarding Unix-only features is, at best, quite confusing. Specially when talking about the process model. My apologies for being a bit of an ass, but I'm very interested in how these patches evolve. Regards.
On Thu, Feb 21, 2013 at 12:22 AM, Isaac Jurado <diptongo@gmail.com> wrote: > That's my point. Working with processes on Windows is going to be > different enough from the other platforms, right? > Okay, I'm confused. Are you looking for me to do something, or just passing the time?
Replying Bryan O'Sullivan: > On Thu, Feb 21, 2013 at 12:22 AM, Isaac Jurado <diptongo@gmail.com> wrote: > >> That's my point. Working with processes on Windows is going to be >> different enough from the other platforms, right? > > Okay, I'm confused. Are you looking for me to do something, or just > passing the time? Last time I reply on this, I promise. Let me summarize what, from my point of view, has happened. - You submitted a patch that uses a monitor thread on the parent process in order to detect a children failure early and finish the rest of the children. - I suggested to leave that task to the children processes by putting all of them in the same process group and "broadcasting" a signal on failure. Which could circumvent the need for threading the parent process. - You replied that process groups are Unix only and you wanted to maximize code portability. - I was confused, since os.fork() and atomic writes to pipes are also Unix only (POSIX in particular). So I tried to ask for a better explanation, as politely as possible. Specially since mailing list discussions tend to heat up very quickly and, between you and me, you are the authority. I don't want any trouble. - After a pair of replies, you told me, more or less, that Windows is a whole different story. Statement that I found very contradictory from the first argument rejecting the use of process groups. Therefore, when, in theroy, I was trying to understand the resasonings behind some design decisions, in practice I was unwillingly passing the time. As I said, there's no point in taking this discussion further. Whatever you write will be fine, even if apparently contradictory and if I have any complaint, I'll send a patch. Sorry for the confusion. Best regards.
On Thu, Feb 21, 2013 at 1:58 PM, Isaac Jurado <diptongo@gmail.com> wrote: > - I was confused, since os.fork() and atomic writes to pipes are also > Unix only (POSIX in particular). I see. Thanks for explaining what you were looking for. So, insofar as there's a Windows plan (and my hope is that someone else will pick up the baton there and figure it out), it is not very interesting yet: when developing the Windows code, refactor the process startup code in the worker module so that the Unix-specific stuff is more self-contained. I didn't do that early on because I don't have Windows code sitting around to suggest what shape the portability stuff should have. That's all. The process killing code is already portable as far as I can tell, so based on my current understanding, it can stay as it is. By the way, I think that small writes to pipes may possibly be atomic on Windows, although the WriteFile documentation doesn't state this clearly anywhere.
Patch
diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -6,7 +6,7 @@ # GNU General Public License version 2 or any later version. from i18n import _ -import os, signal, sys, util +import os, signal, sys, threading, util def countcpus(): '''try to count the number of CPUs on the system''' @@ -77,6 +77,7 @@ def _posixworker(ui, func, staticargs, a workers = _numworkers(ui) oldhandler = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGINT, signal.SIG_IGN) + pids, problem = [], [0] for pargs in partition(args, workers): pid = os.fork() if pid == 0: @@ -88,25 +89,40 @@ def _posixworker(ui, func, staticargs, a os._exit(0) except KeyboardInterrupt: os._exit(255) + pids.append(pid) + pids.reverse() os.close(wfd) fp = os.fdopen(rfd, 'rb', 0) - def cleanup(): - # python 2.4 is too dumb for try/yield/finally - signal.signal(signal.SIGINT, oldhandler) - problem = None - for i in xrange(workers): + def killworkers(): + # if one worker bails, there's no good reason to wait for the rest + for p in pids: + try: + os.kill(p, signal.SIGTERM) + except OSError, err: + if err.errno != errno.ESRCH: + raise + def waitforworkers(): + for p in pids: pid, st = os.wait() if st and not problem: - problem = _exitstatus(st) - if problem: - if problem < 0: - os.kill(os.getpid(), -problem) - sys.exit(problem) + problem[0] = _exitstatus(st) + killworkers() + t = threading.Thread(target=waitforworkers) + t.start() + def cleanup(): + signal.signal(signal.SIGINT, oldhandler) + t.join() + status = problem[0] + if status: + if status < 0: + os.kill(os.getpid(), -status) + sys.exit(status) try: for line in fp: l = line.split(' ', 1) yield int(l[0]), l[1][:-1] except: # re-raises + killworkers() cleanup() raise cleanup()