Patchwork D1459: workers: handling exceptions in windows workers

login
register
mail settings
Submitter phabricator
Date Dec. 15, 2017, 5:13 p.m.
Message ID <3031f0438821ecfb57aac5d8068783bb@localhost.localdomain>
Download mbox | patch
Permalink /patch/26317/
State Not Applicable
Headers show

Comments

phabricator - Dec. 15, 2017, 5:13 p.m.
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG71427ff1dff8: workers: handling exceptions in windows workers (authored by wlis, committed by ).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D1459?vs=3666&id=4486

REVISION DETAIL
  https://phab.mercurial-scm.org/D1459

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS




To: wlis, #hg-reviewers, durin42
Cc: mercurial-devel

Patch

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -214,18 +214,45 @@ 
             self._resultqueue = resultqueue
             self._func = func
             self._staticargs = staticargs
+            self._interrupted = False
+            self.exception = None
+
+        def interrupt(self):
+            self._interrupted = True
 
         def run(self):
-            while not self._taskqueue.empty():
-                try:
-                    args = self._taskqueue.get_nowait()
-                    for res in self._func(*self._staticargs + (args,)):
-                        self._resultqueue.put(res)
-                except util.empty:
-                    break
+            try:
+                while not self._taskqueue.empty():
+                    try:
+                        args = self._taskqueue.get_nowait()
+                        for res in self._func(*self._staticargs + (args,)):
+                            self._resultqueue.put(res)
+                            # threading doesn't provide a native way to
+                            # interrupt execution. handle it manually at every
+                            # iteration.
+                            if self._interrupted:
+                                return
+                    except util.empty:
+                        break
+            except Exception as e:
+                # store the exception such that the main thread can resurface
+                # it as if the func was running without workers.
+                self.exception = e
+                raise
+
+    threads = []
+    def killworkers():
+        for t in threads:
+            t.interrupt()
+        for t in threads:
+            # try to let the threads handle interruption, but don't wait
+            # indefintely. the thread could be in infinite loop, handling
+            # a very long task or in a deadlock situation
+            t.join(5)
+            if t.is_alive():
+                raise error.Abort(_('failed to join worker thread'))
 
     workers = _numworkers(ui)
-    threads = []
     resultqueue = util.queue()
     taskqueue = util.queue()
     # partition work to more pieces than workers to minimize the chance
@@ -236,12 +263,24 @@ 
         t = Worker(taskqueue, resultqueue, func, staticargs)
         threads.append(t)
         t.start()
-    while any(t.is_alive() for t in threads):
+
+    while len(threads) > 0:
         while not resultqueue.empty():
             yield resultqueue.get()
-        t = threads[0]
-        t.join(0.05)
-        if not t.is_alive():
+        threads[0].join(0.05)
+        finishedthreads = [_t for _t in threads if not _t.is_alive()]
+        for t in finishedthreads:
+            if t.exception is not None:
+                try:
+                    killworkers()
+                except Exception:
+                    # pass over the workers joining failure. it is more
+                    # important to surface the inital exception than the
+                    # fact that one of workers may be processing a large
+                    # task and does not get to handle the interruption.
+                    ui.warn(_("failed to kill worker threads while handling "
+                              "an exception"))
+                raise t.exception
             threads.remove(t)
     while not resultqueue.empty():
         yield resultqueue.get()