Patchwork [2,of,2] upgraderepo: add a config option for parallel computation

login
register
mail settings
Submitter Pierre-Yves David
Date Nov. 29, 2019, 3:51 p.m.
Message ID <85b8caa1a166c9a31a75.1575042672@nodosa.octobus.net>
Download mbox | patch
Permalink /patch/43528/
State Superseded
Headers show

Comments

Pierre-Yves David - Nov. 29, 2019, 3:51 p.m.
# HG changeset patch
# User Pierre-Yves David <pierre-yves.david@octobus.net>
# Date 1569765632 -7200
#      Sun Sep 29 16:00:32 2019 +0200
# Node ID 85b8caa1a166c9a31a75c9b515a5ae3930db5acf
# Parent  55c69c19cb183f2c315cda4cacfc2d40741cf6e5
# EXP-Topic sidedata-copies-perf
# Available At https://dev.heptapod.net/octobus/mercurial-devel/
#              hg pull https://dev.heptapod.net/octobus/mercurial-devel/ -r 85b8caa1a166
upgraderepo: add a config option for parallel computation

The option is put to use to compute new copy tracing side data in parallel. It
use the multiprocessing module as it had the appropriate primitive for what we
needed. Gregory Szorc had concerned on windows so we disabled it there.

See inline comment for details on the parallel implementation.

Patch

diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -706,6 +706,9 @@  coreconfigitem(
     b'experimental', b'worker.wdir-get-thread-safe', default=False,
 )
 coreconfigitem(
+    b'experimental', b'worker.repository-upgrade', default=False,
+)
+coreconfigitem(
     b'experimental', b'xdiff', default=False,
 )
 coreconfigitem(
diff --git a/mercurial/copies.py b/mercurial/copies.py
--- a/mercurial/copies.py
+++ b/mercurial/copies.py
@@ -8,6 +8,7 @@ 
 from __future__ import absolute_import
 
 import collections
+import multiprocessing
 import os
 
 from .i18n import _
@@ -989,6 +990,102 @@  def _getsidedata(srcrepo, rev):
 
 
 def getsidedataadder(srcrepo, destrepo):
+    use_w = srcrepo.ui.configbool('experimental', 'worker.repository-upgrade')
+    if pycompat.iswindows or not use_w:
+        return _get_simple_sidedata_adder(srcrepo, destrepo)
+    else:
+        return _get_worker_sidedata_adder(srcrepo, destrepo)
+
+
+def _sidedata_worker(srcrepo, revs_queue, sidedata_queue, tokens):
+    """The function used by worker precomputing sidedata
+
+    It read an input queue containing revision numbers
+    It write in an output queue containing (rev, <sidedata-map>)
+
+    The `None` input value is used as a stop signal.
+
+    The `tokens` semaphore is user to avoid having too many unprocessed
+    entries. The workers needs to acquire one token before fetching a task.
+    They will be released by the consumer of the produced data.
+    """
+    tokens.acquire()
+    rev = revs_queue.get()
+    while rev is not None:
+        data = _getsidedata(srcrepo, rev)
+        sidedata_queue.put((rev, data))
+        tokens.acquire()
+        rev = revs_queue.get()
+    # processing of `None` is completed, release the token.
+    tokens.release()
+
+
+BUFF_PER_WORKER = 50
+
+
+def _get_worker_sidedata_adder(srcrepo, destrepo):
+    """The parallel version of the sidedata computation
+
+    This code spawn a pool of worker that precompute a buffer of sidedata
+    before we actually need them"""
+    # avoid circular import copies -> scmutil -> worker -> copies
+    from . import worker
+
+    nbworkers = worker._numworkers(srcrepo.ui)
+
+    tokens = multiprocessing.BoundedSemaphore(nbworkers * BUFF_PER_WORKER)
+    revsq = multiprocessing.Queue()
+    sidedataq = multiprocessing.Queue()
+
+    assert srcrepo.filtername is None
+    # queue all tasks beforehand, revision numbers are small and it make
+    # synchronisation simpler
+    #
+    # Since the computation for each node can be quite expensive, the overhead
+    # of using a single queue is not revelant. In practice, most computation
+    # are fast but some are very expensive and dominate all the other smaller
+    # cost.
+    for r in srcrepo.changelog.revs():
+        revsq.put(r)
+    # queue the "no more tasks" markers
+    for i in range(nbworkers):
+        revsq.put(None)
+
+    allworkers = []
+    for i in range(nbworkers):
+        args = (srcrepo, revsq, sidedataq, tokens)
+        w = multiprocessing.Process(target=_sidedata_worker, args=args)
+        allworkers.append(w)
+        w.start()
+
+    # dictionnary to store results for revision higher than we one we are
+    # looking for. For example, if we need the sidedatamap for 42, and 43 is
+    # received, when shelve 43 for later use.
+    staging = {}
+
+    def sidedata_companion(revlog, rev):
+        sidedata = {}
+        if util.safehasattr(revlog, b'filteredrevs'):  # this is a changelog
+            # Is the data previously shelved ?
+            sidedata = staging.pop(rev, None)
+            if sidedata is None:
+                # look at the queued result until we find the one we are lookig
+                # for (shelve the other ones)
+                r, sidedata = sidedataq.get()
+                while r != rev:
+                    staging[r] = sidedata
+                    r, sidedata = sidedataq.get()
+            tokens.release()
+        return False, (), sidedata
+
+    return sidedata_companion
+
+
+def _get_simple_sidedata_adder(srcrepo, destrepo):
+    """The simple version of the sidedata computation
+
+    It just compute it in the same thread on request"""
+
     def sidedatacompanion(revlog, rev):
         sidedata = {}
         if util.safehasattr(revlog, 'filteredrevs'):  # this is a changelog