Patchwork [3,of,3,v2] worker: handle worker failures more aggressively

login
register
mail settings
Submitter Bryan O'Sullivan
Date Feb. 19, 2013, 8:54 p.m.
Message ID <aeca36ccd9a5c9dc5894.1361307246@australite.thefacebook.com>
Download mbox | patch
Permalink /patch/1035/
State Accepted
Commit 9955fc5ee24ba0916a63dbaef9458a7d9e0d110d
Headers show

Comments

Bryan O'Sullivan - Feb. 19, 2013, 8:54 p.m.
# HG changeset patch
# User Bryan O'Sullivan <bryano@fb.com>
# Date 1361307231 28800
# Node ID aeca36ccd9a5c9dc5894fdd5456c3eb29a65b672
# Parent  e0bdbbb4e62ae8e1b588d39f514b59557fe75cfc
worker: handle worker failures more aggressively

We now wait for worker processes in a separate thread, so that we can
spot failures in a timely way, wihout waiting for the progress pipe
to drain.

If a worker fails, we recover the pre-parallel-update behaviour of
failing early by killing its peers before propagating the failure.

Patch

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -6,7 +6,7 @@ 
 # GNU General Public License version 2 or any later version.
 
 from i18n import _
-import os, signal, sys, util
+import os, signal, sys, threading, util
 
 def countcpus():
     '''try to count the number of CPUs on the system'''
@@ -77,6 +77,7 @@  def _posixworker(ui, func, staticargs, a
     workers = _numworkers(ui)
     oldhandler = signal.getsignal(signal.SIGINT)
     signal.signal(signal.SIGINT, signal.SIG_IGN)
+    pids, problem = [], [0]
     for pargs in partition(args, workers):
         pid = os.fork()
         if pid == 0:
@@ -88,26 +89,40 @@  def _posixworker(ui, func, staticargs, a
                 os._exit(0)
             except KeyboardInterrupt:
                 os._exit(255)
+        pids.append(pid)
+    pids.reverse()
     os.close(wfd)
     fp = os.fdopen(rfd, 'rb', 0)
+    def killworkers():
+        # if one worker bails, there's no good reason to wait for the rest
+        for p in pids:
+            try:
+                os.kill(p, signal.SIGTERM)
+            except OSError, err:
+                if err.errno != errno.ESRCH:
+                    raise
+    def waitforworkers():
+        for _ in pids:
+            st = _exitstatus(os.wait()[1])
+            if st and not problem:
+                problem[0] = st
+                killworkers()
+    t = threading.Thread(target=waitforworkers)
+    t.start()
     def cleanup():
-        # python 2.4 is too dumb for try/yield/finally
         signal.signal(signal.SIGINT, oldhandler)
-        problem = None
-        for i in xrange(workers):
-            pid, st = os.wait()
-            st = _exitstatus(st)
-            if st and not problem:
-                problem = st
-        if problem:
-            if problem < 0:
-                os.kill(os.getpid(), -problem)
-            sys.exit(problem)
+        t.join()
+        status = problem[0]
+        if status:
+            if status < 0:
+                os.kill(os.getpid(), -status)
+            sys.exit(status)
     try:
         for line in fp:
             l = line.split(' ', 1)
             yield int(l[0]), l[1][:-1]
     except: # re-raises
+        killworkers()
         cleanup()
         raise
     cleanup()