Patchwork [2,of,2,V2] upgraderepo: add a config option for parallel computation

login
register
mail settings
Submitter Pierre-Yves David
Date Dec. 2, 2019, 10:50 a.m.
Message ID <096a7d53095e3ecf883d.1575283819@nodosa.octobus.net>
Download mbox | patch
Permalink /patch/43555/
State Accepted
Headers show

Comments

Pierre-Yves David - Dec. 2, 2019, 10:50 a.m.
# HG changeset patch
# User Pierre-Yves David <pierre-yves.david@octobus.net>
# Date 1569765632 -7200
#      Sun Sep 29 16:00:32 2019 +0200
# Node ID 096a7d53095e3ecf883d1a743c84aaf8d4f15fe2
# Parent  eb24da344625c3c7c34ff71abce165889de3cc2f
# EXP-Topic sidedata-copies-perf
# Available At https://dev.heptapod.net/octobus/mercurial-devel/
#              hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 096a7d53095e
upgraderepo: add a config option for parallel computation

The option is put to use to compute new copy tracing side data in parallel. It
use the multiprocessing module as it had the appropriate primitive for what we
needed. Gregory Szorc had concerned on windows so we disabled it there.

See inline comment for details on the parallel implementation.
Augie Fackler - Dec. 6, 2019, 7:11 p.m.
> On Dec 2, 2019, at 05:50, Pierre-Yves David <pierre-yves.david@ens-lyon.org> wrote:
> 
> # HG changeset patch
> # User Pierre-Yves David <pierre-yves.david@octobus.net>
> # Date 1569765632 -7200
> #      Sun Sep 29 16:00:32 2019 +0200
> # Node ID 096a7d53095e3ecf883d1a743c84aaf8d4f15fe2
> # Parent  eb24da344625c3c7c34ff71abce165889de3cc2f
> # EXP-Topic sidedata-copies-perf
> # Available At https://dev.heptapod.net/octobus/mercurial-devel/
> #              hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 096a7d53095e
> upgraderepo: add a config option for parallel computation

queued, but I'd appreciate seeing numbers in the near-ash future for how much time this saves, or we should drop the use of multiprocessing IMO

> 
> The option is put to use to compute new copy tracing side data in parallel. It
> use the multiprocessing module as it had the appropriate primitive for what we
> needed. Gregory Szorc had concerned on windows so we disabled it there.
> 
> See inline comment for details on the parallel implementation.
> 
> diff --git a/mercurial/configitems.py b/mercurial/configitems.py
> --- a/mercurial/configitems.py
> +++ b/mercurial/configitems.py
> @@ -706,6 +706,9 @@ coreconfigitem(
>     b'experimental', b'worker.wdir-get-thread-safe', default=False,
> )
> coreconfigitem(
> +    b'experimental', b'worker.repository-upgrade', default=False,
> +)
> +coreconfigitem(
>     b'experimental', b'xdiff', default=False,
> )
> coreconfigitem(
> diff --git a/mercurial/copies.py b/mercurial/copies.py
> --- a/mercurial/copies.py
> +++ b/mercurial/copies.py
> @@ -8,6 +8,7 @@
> from __future__ import absolute_import
> 
> import collections
> +import multiprocessing
> import os
> 
> from .i18n import _
> @@ -989,6 +990,102 @@ def _getsidedata(srcrepo, rev):
> 
> 
> def getsidedataadder(srcrepo, destrepo):
> +    use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade')
> +    if pycompat.iswindows or not use_w:
> +        return _get_simple_sidedata_adder(srcrepo, destrepo)
> +    else:
> +        return _get_worker_sidedata_adder(srcrepo, destrepo)
> +
> +
> +def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
> +    """The function used by worker precomputing sidedata
> +
> +    It read an input queue containing revision numbers
> +    It write in an output queue containing (rev, <sidedata-map>)
> +
> +    The `None` input value is used as a stop signal.
> +
> +    The `tokens` semaphore is user to avoid having too many unprocessed
> +    entries. The workers needs to acquire one token before fetching a task.
> +    They will be released by the consumer of the produced data.
> +    """
> +    tokens.acquire()
> +    rev = revs_queue.get()
> +    while rev is not None:
> +        data = _getsidedata(srcrepo, rev)
> +        sidedata_queue.put((rev, data))
> +        tokens.acquire()
> +        rev = revs_queue.get()
> +    # processing of `None` is completed, release the token.
> +    tokens.release()
> +
> +
> +BUFF_PER_WORKER = 50
> +
> +
> +def _get_worker_sidedata_adder(srcrepo, destrepo):
> +    """The parallel version of the sidedata computation
> +
> +    This code spawn a pool of worker that precompute a buffer of sidedata
> +    before we actually need them"""
> +    # avoid circular import copies -> scmutil -> worker -> copies
> +    from . import worker
> +
> +    nbworkers = worker._numworkers(srcrepo.ui)
> +
> +    tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
> +    revsq = multiprocessing.Queue()
> +    sidedataq = multiprocessing.Queue()
> +
> +    assert srcrepo.filtername is None
> +    # queue all tasks beforehand, revision numbers are small and it make
> +    # synchronisation simpler
> +    #
> +    # Since the computation for each node can be quite expensive, the overhead
> +    # of using a single queue is not revelant. In practice, most computation
> +    # are fast but some are very expensive and dominate all the other smaller
> +    # cost.
> +    for r in srcrepo.changelog.revs():
> +        revsq.put(r)
> +    # queue the "no more tasks" markers
> +    for i in range(nbworkers):
> +        revsq.put(None)
> +
> +    allworkers = []
> +    for i in range(nbworkers):
> +        args = (srcrepo, revsq, sidedataq, tokens)
> +        w = multiprocessing.Process(target=_sidedata_worker, args=args)
> +        allworkers.append(w)
> +        w.start()
> +
> +    # dictionnary to store results for revision higher than we one we are
> +    # looking for. For example, if we need the sidedatamap for 42, and 43 is
> +    # received, when shelve 43 for later use.
> +    staging = {}
> +
> +    def sidedata_companion(revlog, rev):
> +        sidedata = {}
> +        if util.safehasattr(revlog, b'filteredrevs'):  # this is a changelog
> +            # Is the data previously shelved ?
> +            sidedata = staging.pop(rev, None)
> +            if sidedata is None:
> +                # look at the queued result until we find the one we are lookig
> +                # for (shelve the other ones)
> +                r, sidedata = sidedataq.get()
> +                while r != rev:
> +                    staging[r] = sidedata
> +                    r, sidedata = sidedataq.get()
> +            tokens.release()
> +        return False, (), sidedata
> +
> +    return sidedata_companion
> +
> +
> +def _get_simple_sidedata_adder(srcrepo, destrepo):
> +    """The simple version of the sidedata computation
> +
> +    It just compute it in the same thread on request"""
> +
>     def sidedatacompanion(revlog, rev):
>         sidedata = {}
>         if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
Pierre-Yves David - Dec. 7, 2019, 10:25 a.m.
On 12/6/19 8:11 PM, Augie Fackler wrote:
> 
> 
>> On Dec 2, 2019, at 05:50, Pierre-Yves David <pierre-yves.david@ens-lyon.org> wrote:
>>
>> # HG changeset patch
>> # User Pierre-Yves David <pierre-yves.david@octobus.net>
>> # Date 1569765632 -7200
>> #      Sun Sep 29 16:00:32 2019 +0200
>> # Node ID 096a7d53095e3ecf883d1a743c84aaf8d4f15fe2
>> # Parent  eb24da344625c3c7c34ff71abce165889de3cc2f
>> # EXP-Topic sidedata-copies-perf
>> # Available At https://dev.heptapod.net/octobus/mercurial-devel/
>> #              hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 096a7d53095e
>> upgraderepo: add a config option for parallel computation
> 
> queued, but I'd appreciate seeing numbers in the near-ash future for how much time this saves, or we should drop the use of multiprocessing IMO

For smaller repo this provide a significant boost. For example the total 
time of `hg debugupgraderepo --changelog --run` on pypy move from 352s 
to 120s (including the copying of the filelog/manifestlog).

On larger repository, it make things move from days to hours.

Patch

diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -706,6 +706,9 @@  coreconfigitem(
     b'experimental', b'worker.wdir-get-thread-safe', default=False,
 )
 coreconfigitem(
+    b'experimental', b'worker.repository-upgrade', default=False,
+)
+coreconfigitem(
     b'experimental', b'xdiff', default=False,
 )
 coreconfigitem(
diff --git a/mercurial/copies.py b/mercurial/copies.py
--- a/mercurial/copies.py
+++ b/mercurial/copies.py
@@ -8,6 +8,7 @@ 
 from __future__ import absolute_import
 
 import collections
+import multiprocessing
 import os
 
 from .i18n import _
@@ -989,6 +990,102 @@  def _getsidedata(srcrepo, rev):
 
 
 def getsidedataadder(srcrepo, destrepo):
+    use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade')
+    if pycompat.iswindows or not use_w:
+        return _get_simple_sidedata_adder(srcrepo, destrepo)
+    else:
+        return _get_worker_sidedata_adder(srcrepo, destrepo)
+
+
+def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
+    """The function used by worker precomputing sidedata
+
+    It read an input queue containing revision numbers
+    It write in an output queue containing (rev, <sidedata-map>)
+
+    The `None` input value is used as a stop signal.
+
+    The `tokens` semaphore is user to avoid having too many unprocessed
+    entries. The workers needs to acquire one token before fetching a task.
+    They will be released by the consumer of the produced data.
+    """
+    tokens.acquire()
+    rev = revs_queue.get()
+    while rev is not None:
+        data = _getsidedata(srcrepo, rev)
+        sidedata_queue.put((rev, data))
+        tokens.acquire()
+        rev = revs_queue.get()
+    # processing of `None` is completed, release the token.
+    tokens.release()
+
+
+BUFF_PER_WORKER = 50
+
+
+def _get_worker_sidedata_adder(srcrepo, destrepo):
+    """The parallel version of the sidedata computation
+
+    This code spawn a pool of worker that precompute a buffer of sidedata
+    before we actually need them"""
+    # avoid circular import copies -> scmutil -> worker -> copies
+    from . import worker
+
+    nbworkers = worker._numworkers(srcrepo.ui)
+
+    tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
+    revsq = multiprocessing.Queue()
+    sidedataq = multiprocessing.Queue()
+
+    assert srcrepo.filtername is None
+    # queue all tasks beforehand, revision numbers are small and it make
+    # synchronisation simpler
+    #
+    # Since the computation for each node can be quite expensive, the overhead
+    # of using a single queue is not revelant. In practice, most computation
+    # are fast but some are very expensive and dominate all the other smaller
+    # cost.
+    for r in srcrepo.changelog.revs():
+        revsq.put(r)
+    # queue the "no more tasks" markers
+    for i in range(nbworkers):
+        revsq.put(None)
+
+    allworkers = []
+    for i in range(nbworkers):
+        args = (srcrepo, revsq, sidedataq, tokens)
+        w = multiprocessing.Process(target=_sidedata_worker, args=args)
+        allworkers.append(w)
+        w.start()
+
+    # dictionnary to store results for revision higher than we one we are
+    # looking for. For example, if we need the sidedatamap for 42, and 43 is
+    # received, when shelve 43 for later use.
+    staging = {}
+
+    def sidedata_companion(revlog, rev):
+        sidedata = {}
+        if util.safehasattr(revlog, b'filteredrevs'):  # this is a changelog
+            # Is the data previously shelved ?
+            sidedata = staging.pop(rev, None)
+            if sidedata is None:
+                # look at the queued result until we find the one we are lookig
+                # for (shelve the other ones)
+                r, sidedata = sidedataq.get()
+                while r != rev:
+                    staging[r] = sidedata
+                    r, sidedata = sidedataq.get()
+            tokens.release()
+        return False, (), sidedata
+
+    return sidedata_companion
+
+
+def _get_simple_sidedata_adder(srcrepo, destrepo):
+    """The simple version of the sidedata computation
+
+    It just compute it in the same thread on request"""
+
     def sidedatacompanion(revlog, rev):
         sidedata = {}
         if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog