Submitter | Pierre-Yves David |
---|---|
Date | Dec. 2, 2019, 10:50 a.m. |
Message ID | <096a7d53095e3ecf883d.1575283819@nodosa.octobus.net> |
Download | mbox | patch |
Permalink | /patch/43555/ |
State | Accepted |
Headers | show |
Comments
> On Dec 2, 2019, at 05:50, Pierre-Yves David <pierre-yves.david@ens-lyon.org> wrote: > > # HG changeset patch > # User Pierre-Yves David <pierre-yves.david@octobus.net> > # Date 1569765632 -7200 > # Sun Sep 29 16:00:32 2019 +0200 > # Node ID 096a7d53095e3ecf883d1a743c84aaf8d4f15fe2 > # Parent eb24da344625c3c7c34ff71abce165889de3cc2f > # EXP-Topic sidedata-copies-perf > # Available At https://dev.heptapod.net/octobus/mercurial-devel/ > # hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 096a7d53095e > upgraderepo: add a config option for parallel computation queued, but I'd appreciate seeing numbers in the near-ash future for how much time this saves, or we should drop the use of multiprocessing IMO > > The option is put to use to compute new copy tracing side data in parallel. It > use the multiprocessing module as it had the appropriate primitive for what we > needed. Gregory Szorc had concerned on windows so we disabled it there. > > See inline comment for details on the parallel implementation. > > diff --git a/mercurial/configitems.py b/mercurial/configitems.py > --- a/mercurial/configitems.py > +++ b/mercurial/configitems.py > @@ -706,6 +706,9 @@ coreconfigitem( > b'experimental', b'worker.wdir-get-thread-safe', default=False, > ) > coreconfigitem( > + b'experimental', b'worker.repository-upgrade', default=False, > +) > +coreconfigitem( > b'experimental', b'xdiff', default=False, > ) > coreconfigitem( > diff --git a/mercurial/copies.py b/mercurial/copies.py > --- a/mercurial/copies.py > +++ b/mercurial/copies.py > @@ -8,6 +8,7 @@ > from __future__ import absolute_import > > import collections > +import multiprocessing > import os > > from .i18n import _ > @@ -989,6 +990,102 @@ def _getsidedata(srcrepo, rev): > > > def getsidedataadder(srcrepo, destrepo): > + use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade') > + if pycompat.iswindows or not use_w: > + return _get_simple_sidedata_adder(srcrepo, destrepo) > + else: > + return _get_worker_sidedata_adder(srcrepo, destrepo) > + > + > +def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): > + """The function used by worker precomputing sidedata > + > + It read an input queue containing revision numbers > + It write in an output queue containing (rev, <sidedata-map>) > + > + The `None` input value is used as a stop signal. > + > + The `tokens` semaphore is user to avoid having too many unprocessed > + entries. The workers needs to acquire one token before fetching a task. > + They will be released by the consumer of the produced data. > + """ > + tokens.acquire() > + rev = revs_queue.get() > + while rev is not None: > + data = _getsidedata(srcrepo, rev) > + sidedata_queue.put((rev, data)) > + tokens.acquire() > + rev = revs_queue.get() > + # processing of `None` is completed, release the token. > + tokens.release() > + > + > +BUFF_PER_WORKER = 50 > + > + > +def _get_worker_sidedata_adder(srcrepo, destrepo): > + """The parallel version of the sidedata computation > + > + This code spawn a pool of worker that precompute a buffer of sidedata > + before we actually need them""" > + # avoid circular import copies -> scmutil -> worker -> copies > + from . import worker > + > + nbworkers = worker._numworkers(srcrepo.ui) > + > + tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) > + revsq = multiprocessing.Queue() > + sidedataq = multiprocessing.Queue() > + > + assert srcrepo.filtername is None > + # queue all tasks beforehand, revision numbers are small and it make > + # synchronisation simpler > + # > + # Since the computation for each node can be quite expensive, the overhead > + # of using a single queue is not revelant. In practice, most computation > + # are fast but some are very expensive and dominate all the other smaller > + # cost. > + for r in srcrepo.changelog.revs(): > + revsq.put(r) > + # queue the "no more tasks" markers > + for i in range(nbworkers): > + revsq.put(None) > + > + allworkers = [] > + for i in range(nbworkers): > + args = (srcrepo, revsq, sidedataq, tokens) > + w = multiprocessing.Process(target=_sidedata_worker, args=args) > + allworkers.append(w) > + w.start() > + > + # dictionnary to store results for revision higher than we one we are > + # looking for. For example, if we need the sidedatamap for 42, and 43 is > + # received, when shelve 43 for later use. > + staging = {} > + > + def sidedata_companion(revlog, rev): > + sidedata = {} > + if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog > + # Is the data previously shelved ? > + sidedata = staging.pop(rev, None) > + if sidedata is None: > + # look at the queued result until we find the one we are lookig > + # for (shelve the other ones) > + r, sidedata = sidedataq.get() > + while r != rev: > + staging[r] = sidedata > + r, sidedata = sidedataq.get() > + tokens.release() > + return False, (), sidedata > + > + return sidedata_companion > + > + > +def _get_simple_sidedata_adder(srcrepo, destrepo): > + """The simple version of the sidedata computation > + > + It just compute it in the same thread on request""" > + > def sidedatacompanion(revlog, rev): > sidedata = {} > if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog > _______________________________________________ > Mercurial-devel mailing list > Mercurial-devel@mercurial-scm.org > https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
On 12/6/19 8:11 PM, Augie Fackler wrote: > > >> On Dec 2, 2019, at 05:50, Pierre-Yves David <pierre-yves.david@ens-lyon.org> wrote: >> >> # HG changeset patch >> # User Pierre-Yves David <pierre-yves.david@octobus.net> >> # Date 1569765632 -7200 >> # Sun Sep 29 16:00:32 2019 +0200 >> # Node ID 096a7d53095e3ecf883d1a743c84aaf8d4f15fe2 >> # Parent eb24da344625c3c7c34ff71abce165889de3cc2f >> # EXP-Topic sidedata-copies-perf >> # Available At https://dev.heptapod.net/octobus/mercurial-devel/ >> # hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 096a7d53095e >> upgraderepo: add a config option for parallel computation > > queued, but I'd appreciate seeing numbers in the near-ash future for how much time this saves, or we should drop the use of multiprocessing IMO For smaller repo this provide a significant boost. For example the total time of `hg debugupgraderepo --changelog --run` on pypy move from 352s to 120s (including the copying of the filelog/manifestlog). On larger repository, it make things move from days to hours.
Patch
diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -706,6 +706,9 @@ coreconfigitem( b'experimental', b'worker.wdir-get-thread-safe', default=False, ) coreconfigitem( + b'experimental', b'worker.repository-upgrade', default=False, +) +coreconfigitem( b'experimental', b'xdiff', default=False, ) coreconfigitem( diff --git a/mercurial/copies.py b/mercurial/copies.py --- a/mercurial/copies.py +++ b/mercurial/copies.py @@ -8,6 +8,7 @@ from __future__ import absolute_import import collections +import multiprocessing import os from .i18n import _ @@ -989,6 +990,102 @@ def _getsidedata(srcrepo, rev): def getsidedataadder(srcrepo, destrepo): + use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade') + if pycompat.iswindows or not use_w: + return _get_simple_sidedata_adder(srcrepo, destrepo) + else: + return _get_worker_sidedata_adder(srcrepo, destrepo) + + +def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens): + """The function used by worker precomputing sidedata + + It read an input queue containing revision numbers + It write in an output queue containing (rev, <sidedata-map>) + + The `None` input value is used as a stop signal. + + The `tokens` semaphore is user to avoid having too many unprocessed + entries. The workers needs to acquire one token before fetching a task. + They will be released by the consumer of the produced data. + """ + tokens.acquire() + rev = revs_queue.get() + while rev is not None: + data = _getsidedata(srcrepo, rev) + sidedata_queue.put((rev, data)) + tokens.acquire() + rev = revs_queue.get() + # processing of `None` is completed, release the token. + tokens.release() + + +BUFF_PER_WORKER = 50 + + +def _get_worker_sidedata_adder(srcrepo, destrepo): + """The parallel version of the sidedata computation + + This code spawn a pool of worker that precompute a buffer of sidedata + before we actually need them""" + # avoid circular import copies -> scmutil -> worker -> copies + from . import worker + + nbworkers = worker._numworkers(srcrepo.ui) + + tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER) + revsq = multiprocessing.Queue() + sidedataq = multiprocessing.Queue() + + assert srcrepo.filtername is None + # queue all tasks beforehand, revision numbers are small and it make + # synchronisation simpler + # + # Since the computation for each node can be quite expensive, the overhead + # of using a single queue is not revelant. In practice, most computation + # are fast but some are very expensive and dominate all the other smaller + # cost. + for r in srcrepo.changelog.revs(): + revsq.put(r) + # queue the "no more tasks" markers + for i in range(nbworkers): + revsq.put(None) + + allworkers = [] + for i in range(nbworkers): + args = (srcrepo, revsq, sidedataq, tokens) + w = multiprocessing.Process(target=_sidedata_worker, args=args) + allworkers.append(w) + w.start() + + # dictionnary to store results for revision higher than we one we are + # looking for. For example, if we need the sidedatamap for 42, and 43 is + # received, when shelve 43 for later use. + staging = {} + + def sidedata_companion(revlog, rev): + sidedata = {} + if util.safehasattr(revlog, b'filteredrevs'): # this is a changelog + # Is the data previously shelved ? + sidedata = staging.pop(rev, None) + if sidedata is None: + # look at the queued result until we find the one we are lookig + # for (shelve the other ones) + r, sidedata = sidedataq.get() + while r != rev: + staging[r] = sidedata + r, sidedata = sidedataq.get() + tokens.release() + return False, (), sidedata + + return sidedata_companion + + +def _get_simple_sidedata_adder(srcrepo, destrepo): + """The simple version of the sidedata computation + + It just compute it in the same thread on request""" + def sidedatacompanion(revlog, rev): sidedata = {} if util.safehasattr(revlog, 'filteredrevs'): # this is a changelog