Submitter | phabricator |
---|---|
Date | Nov. 20, 2017, 6:37 p.m. |
Message ID | <differential-rev-PHID-DREV-7nmbkuwv5h346jvl37wv-req@phab.mercurial-scm.org> |
Download | mbox | patch |
Permalink | /patch/25669/ |
State | Superseded |
Headers | show |
Comments
indygreg added a comment. I haven't looked at the code in much detail. The use of threads for the workers on Windows is obviously better performance wise than the current world where we use a single thread in a single process. However, we'll hit the upper limit of performance pretty quickly because the process will be CPU bound in revlogs. And the upper limit with Python threads will be much worse than the upper limit with a concurrency primitive that doesn't have the GIL. FWIW, last I checked we only had 1 consumer of the worker API in core: working directory updates. Given limited users of this API and the inability to easily implement multi-process workers on Windows (figuring out how to invoke a new Python process is not trivial because the ways Mercurial can be invoked on Windows), I've been very tempted to remove the worker API. I'm actually thinking about replacing its one use case with Rust. But I don't have any code to show for that and likely won't for several weeks. That being said, I don't want to discourage this code from landing. If others feel it is useful, let's get it in. But its lifetime may be short if the Rust solution materializes. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 To: wlis, #hg-reviewers Cc: indygreg, mercurial-devel
wlis planned changes to this revision. wlis added a comment. I need to test these changes a bit more. I found a place in merge.py that has a risk of race condition and need to figure out how to protect it. Right now there are 2 places where we use workers. 1 in core (merge.py) and there is also us in lfs in fb extensions. We actually hit the CPU limit right away when we start using threading, and I initially looked into implementing something closer to what we do in posix, but as you say that is not easy on Windows as fork doesn't exist. I think that from my perspective putting the applyupdates code into rust and starting multiple processes with that would work, and as you say gives more space for perf improvement than this change. I am happy with making this code short lived if it gets replaced by a better solution. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 To: wlis, #hg-reviewers Cc: indygreg, mercurial-devel
wlis added a comment. The previous issues were related to fb-hgext remotefilelog and https://phab.mercurial-scm.org/D1513 fixes it on the side of remotefilelog. I will still need to test this code on a repo without remotefilelog to make sure that the normal filelog doesn't hit similar issues. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 To: wlis, #hg-reviewers Cc: indygreg, mercurial-devel
durin42 added a comment. Did you test this on a non-remotefilelog repo? REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 To: wlis, #hg-reviewers Cc: durin42, indygreg, mercurial-devel
wlis added a comment. @durin42 yes, I tested without remotefilelog (at least I believe it was not being used at that time). I cloned a repo with --config extensions.remotefilelog=! and then put appropriate section in .hg/hgrc Ran updates between far revisions and verified that threads get started during update. Hg didn't complain about anything and repo stayed healthy. REPOSITORY rHG Mercurial REVISION DETAIL https://phab.mercurial-scm.org/D1458 To: wlis, #hg-reviewers Cc: durin42, indygreg, mercurial-devel
Patch
diff --git a/mercurial/worker.py b/mercurial/worker.py --- a/mercurial/worker.py +++ b/mercurial/worker.py @@ -11,6 +11,7 @@ import os import signal import sys +import threading from .i18n import _ from . import ( @@ -53,7 +54,7 @@ raise error.Abort(_('number of cpus must be an integer')) return min(max(countcpus(), 4), 32) -if pycompat.isposix: +if pycompat.isposix or pycompat.iswindows: _startupcost = 0.01 else: _startupcost = 1e30 @@ -203,7 +204,51 @@ elif os.WIFSIGNALED(code): return -os.WTERMSIG(code) -if not pycompat.iswindows: +def _windowsworker(ui, func, staticargs, args): + class Worker(threading.Thread): + def __init__(self, taskqueue, resultqueue, func, staticargs, + group=None, target=None, name=None, verbose=None): + threading.Thread.__init__(self, group=group, target=target, + name=name, verbose=verbose) + self._taskqueue = taskqueue + self._resultqueue = resultqueue + self._func = func + self._staticargs = staticargs + + def run(self): + while not self._taskqueue.empty(): + try: + args = self._taskqueue.get_nowait() + for res in self._func(*self._staticargs + (args,)): + self._resultqueue.put(res) + except util.empty: + break + + workers = _numworkers(ui) + threads = [] + resultqueue = util.queue() + taskqueue = util.queue() + # partition work to more pieces than workers to minimize the chance + # of uneven distribution of large tasks between the workers + for pargs in partition(args, workers * 20): + taskqueue.put(pargs) + for _i in range(workers): + t = Worker(taskqueue, resultqueue, func, staticargs) + threads.append(t) + t.start() + while any(t.is_alive() for t in threads): + while not resultqueue.empty(): + yield resultqueue.get() + t = threads[0] + t.join(0.05) + if not t.is_alive(): + threads.remove(t) + while not resultqueue.empty(): + yield resultqueue.get() + +if pycompat.iswindows: + _platformworker = _windowsworker +else: _platformworker = _posixworker _exitstatus = _posixexitstatus