Patchwork D1458: workers: implemented worker on windows

login
register
mail settings
Submitter phabricator
Date Nov. 20, 2017, 6:37 p.m.
Message ID <differential-rev-PHID-DREV-7nmbkuwv5h346jvl37wv-req@phab.mercurial-scm.org>
Download mbox | patch
Permalink /patch/25669/
State Superseded
Headers show

Comments

phabricator - Nov. 20, 2017, 6:37 p.m.
wlis created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This change implements thread based worker on windows.
  The handling of exception from within threads will happen in separate diff.
  
  The worker is for now used in mercurial/merge.py and in lfs extension
  
  After multiple tests and milions of files materiealized, thousands lfs fetched
  it seems that neither merge.py nor lfs/blobstore.py are not thread safe. I also
  looked through the code and besides the backgroundfilecloser (handled in base
  of this) things look good.
  
  The performance boost of this on windows is
  
  ~50% for sparse --enable-profile
  
  - Speedup of hg up/rebase - not exactly measured

TEST PLAN
  Ran 10s of hg sparse --enable-profile and --disable-profile operations on large profiles and verified that workers are running. Used sysinternals suite to see that all threads are spawned and run as they should
  
  Run various other operations on the repo including update and rebase
  
  Ran tests on CentOS and all tests that pass on @ pass here

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1458

AFFECTED FILES
  mercurial/worker.py

CHANGE DETAILS




To: wlis, #hg-reviewers
Cc: mercurial-devel
phabricator - Nov. 21, 2017, 4:03 a.m.
indygreg added a comment.


  I haven't looked at the code in much detail.
  
  The use of threads for the workers on Windows is obviously better performance wise than the current world where we use a single thread in a single process. However, we'll hit the upper limit of performance pretty quickly because the process will be CPU bound in revlogs. And the upper limit with Python threads will be much worse than the upper limit with a concurrency primitive that doesn't have the GIL.
  
  FWIW, last I checked we only had 1 consumer of the worker API in core: working directory updates. Given limited users of this API and the inability to easily implement multi-process workers on Windows (figuring out how to invoke a new Python process is not trivial because the ways Mercurial can be invoked on Windows), I've been very tempted to remove the worker API. I'm actually thinking about replacing its one use case with Rust. But I don't have any code to show for that and likely won't for several weeks. That being said, I don't want to discourage this code from landing. If others feel it is useful, let's get it in. But its lifetime may be short if the Rust solution materializes.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1458

To: wlis, #hg-reviewers
Cc: indygreg, mercurial-devel
phabricator - Nov. 21, 2017, 5:08 a.m.
wlis planned changes to this revision.
wlis added a comment.


  I need to test these changes a bit more. I found a place in merge.py that has a risk of race condition and need to figure out how to protect it.
  Right now there are 2 places where we use workers. 1 in core (merge.py) and there is also us in lfs in fb extensions.
  
  We actually hit the CPU limit right away when we start using threading, and I initially looked into implementing something closer to what we do in posix, but as you say that is not easy on Windows as fork doesn't exist. I think that from my perspective putting the applyupdates code into rust and starting multiple processes with that would work, and as you say gives more space for perf improvement than this change.
  
  I am happy with making this code short lived if it gets replaced by a better solution.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1458

To: wlis, #hg-reviewers
Cc: indygreg, mercurial-devel
phabricator - Nov. 25, 2017, 4:50 p.m.
wlis added a comment.


  The previous issues were related to fb-hgext remotefilelog and https://phab.mercurial-scm.org/D1513 fixes it on the side of remotefilelog.
  I will still need to test this code on a repo without remotefilelog to make sure that the normal filelog doesn't hit similar issues.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1458

To: wlis, #hg-reviewers
Cc: indygreg, mercurial-devel
phabricator - Dec. 5, 2017, 10:26 p.m.
durin42 added a comment.


  Did you test this on a non-remotefilelog repo?

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1458

To: wlis, #hg-reviewers
Cc: durin42, indygreg, mercurial-devel
phabricator - Dec. 12, 2017, 9:26 p.m.
wlis added a comment.


  @durin42 yes, I tested without remotefilelog (at least I believe it was not being used at that time). I cloned a repo with --config extensions.remotefilelog=! and then put appropriate section in .hg/hgrc
  Ran updates between far revisions and verified that threads get started during update. Hg didn't complain about anything and repo stayed healthy.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D1458

To: wlis, #hg-reviewers
Cc: durin42, indygreg, mercurial-devel

Patch

diff --git a/mercurial/worker.py b/mercurial/worker.py
--- a/mercurial/worker.py
+++ b/mercurial/worker.py
@@ -11,6 +11,7 @@ 
 import os
 import signal
 import sys
+import threading
 
 from .i18n import _
 from . import (
@@ -53,7 +54,7 @@ 
             raise error.Abort(_('number of cpus must be an integer'))
     return min(max(countcpus(), 4), 32)
 
-if pycompat.isposix:
+if pycompat.isposix or pycompat.iswindows:
     _startupcost = 0.01
 else:
     _startupcost = 1e30
@@ -203,7 +204,51 @@ 
     elif os.WIFSIGNALED(code):
         return -os.WTERMSIG(code)
 
-if not pycompat.iswindows:
+def _windowsworker(ui, func, staticargs, args):
+    class Worker(threading.Thread):
+        def __init__(self, taskqueue, resultqueue, func, staticargs,
+                     group=None, target=None, name=None, verbose=None):
+            threading.Thread.__init__(self, group=group, target=target,
+                                      name=name, verbose=verbose)
+            self._taskqueue = taskqueue
+            self._resultqueue = resultqueue
+            self._func = func
+            self._staticargs = staticargs
+
+        def run(self):
+            while not self._taskqueue.empty():
+                try:
+                    args = self._taskqueue.get_nowait()
+                    for res in self._func(*self._staticargs + (args,)):
+                        self._resultqueue.put(res)
+                except util.empty:
+                    break
+
+    workers = _numworkers(ui)
+    threads = []
+    resultqueue = util.queue()
+    taskqueue = util.queue()
+    # partition work to more pieces than workers to minimize the chance
+    # of uneven distribution of large tasks between the workers
+    for pargs in partition(args, workers * 20):
+        taskqueue.put(pargs)
+    for _i in range(workers):
+        t = Worker(taskqueue, resultqueue, func, staticargs)
+        threads.append(t)
+        t.start()
+    while any(t.is_alive() for t in threads):
+        while not resultqueue.empty():
+            yield resultqueue.get()
+        t = threads[0]
+        t.join(0.05)
+        if not t.is_alive():
+            threads.remove(t)
+    while not resultqueue.empty():
+        yield resultqueue.get()
+
+if pycompat.iswindows:
+    _platformworker = _windowsworker
+else:
     _platformworker = _posixworker
     _exitstatus = _posixexitstatus