Patchwork D3846: fix: use a worker pool to parallelize running tools

login
register
mail settings
Submitter phabricator
Date June 27, 2018, 1:07 a.m.
Message ID <differential-rev-PHID-DREV-52wq56i5msbpswnbh25w-req@phab.mercurial-scm.org>
Download mbox | patch
Permalink /patch/32454/
State Superseded
Headers show

Comments

phabricator - June 27, 2018, 1:07 a.m.
hooper created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This is important for usability when tools are slow or numerous.

REPOSITORY
  rHG Mercurial

REVISION DETAIL
  https://phab.mercurial-scm.org/D3846

AFFECTED FILES
  hgext/fix.py

CHANGE DETAILS




To: hooper, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/hgext/fix.py b/hgext/fix.py
--- a/hgext/fix.py
+++ b/hgext/fix.py
@@ -70,6 +70,7 @@ 
     registrar,
     scmutil,
     util,
+    worker,
 )
 
 # Note for extension authors: ONLY specify testedwith = 'ships-with-hg-core' for
@@ -138,19 +139,40 @@ 
         basectxs = getbasectxs(repo, opts, revstofix)
         workqueue, numitems = getworkqueue(ui, repo, pats, opts, revstofix,
                                            basectxs)
+        fixers = getfixers(ui)
+
+        # There are no data dependencies between the workers fixing each file
+        # revision, so we can use all available parallelism.
+        def getfixes(items):
+            for rev, path in items:
+                ctx = repo[rev]
+                olddata = ctx[path].data()
+                newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
+                # Don't waste memory/time passing unchanged content back, but
+                # produce one result per item either way.
+                yield (rev, path, newdata if newdata != olddata else None)
+        results = worker.worker(ui, 1.0, getfixes, tuple(), workqueue)
+
+        # We have to hold on to the data for each successor revision in memory
+        # until all its parents are committed. We ensure this by committing and
+        # freeing memory for the revisions in some topological order. This
+        # leaves a little bit of memory efficiency on the table, but also makes
+        # the tests deterministic. It might also be considered a feature since
+        # it makes the results more easily reproducible.
         filedata = collections.defaultdict(dict)
         replacements = {}
-        fixers = getfixers(ui)
-        # Some day this loop can become a worker pool, but for now it's easier
-        # to fix everything serially in topological order.
-        for rev, path in sorted(workqueue):
-            ctx = repo[rev]
-            olddata = ctx[path].data()
-            newdata = fixfile(ui, opts, fixers, ctx, path, basectxs[rev])
-            if newdata != olddata:
+        commitorder = sorted(revstofix, reverse=True)
+        for rev, path, newdata in results:
+            if newdata is not None:
                 filedata[rev][path] = newdata
             numitems[rev] -= 1
-            if not numitems[rev]:
+            # Apply the fixes for this and any other revisions that are ready
+            # and sitting at the front of the queue. Using a loop here prevents
+            # the queue from being blocked by the first revision to be ready out
+            # of order.
+            while commitorder and not numitems[commitorder[-1]]:
+                rev = commitorder.pop()
+                ctx = repo[rev]
                 if rev == wdirrev:
                     writeworkingdir(repo, ctx, filedata[rev], replacements)
                 else:
@@ -168,11 +190,19 @@ 
     topological order. Each work item represents a file in the working copy or
     in some revision that should be fixed and written back to the working copy
     or into a replacement revision.
+
+    Work items for the same revision are grouped together, so that a worker
+    pool starting with the first N items in parallel is likely to finish the
+    first revision's work before other revisions. This can allow us to write
+    the result to disk and reduce memory footprint. At time of writing, the
+    partition strategy in worker.py seems favorable to this. We also sort the
+    items by ascending revision number to match the order in which we commit
+    the fixes later.
     """
     workqueue = []
     numitems = collections.defaultdict(int)
     maxfilesize = ui.configbytes('fix', 'maxfilesize')
-    for rev in revstofix:
+    for rev in sorted(revstofix):
         fixctx = repo[rev]
         match = scmutil.match(fixctx, pats, opts)
         for path in pathstofix(ui, repo, pats, opts, match, basectxs[rev],