Patchwork D9684: copies-rust: move CPU-heavy Rust processing into a child thread

login
register
mail settings
Submitter phabricator
Date Jan. 6, 2021, 2:13 p.m.
Message ID <differential-rev-PHID-DREV-bm7fh3betwc7bnmbk7ui-req@mercurial-scm.org>
Download mbox | patch
Permalink /patch/47997/
State New
Headers show

Comments

phabricator - Jan. 6, 2021, 2:13 p.m.
SimonSapin created this revision.
Herald added a reviewer: hg-reviewers.
Herald added a subscriber: mercurial-patches.

REVISION SUMMARY
  … that runs in parallel with the parent thread fetching data.
  This can be disabled through a new config. CLI example:
  
    hg --config=devel.copy-tracing.multi-thread=no
  
  For now both threads use the GIL, later commits will reduce this.

REPOSITORY
  rHG Mercurial

BRANCH
  default

REVISION DETAIL
  https://phab.mercurial-scm.org/D9684

AFFECTED FILES
  mercurial/configitems.py
  mercurial/copies.py
  rust/Cargo.lock
  rust/hg-cpython/Cargo.toml
  rust/hg-cpython/src/copy_tracing.rs

CHANGE DETAILS




To: SimonSapin, #hg-reviewers
Cc: mercurial-patches, mercurial-devel

Patch

diff --git a/rust/hg-cpython/src/copy_tracing.rs b/rust/hg-cpython/src/copy_tracing.rs
--- a/rust/hg-cpython/src/copy_tracing.rs
+++ b/rust/hg-cpython/src/copy_tracing.rs
@@ -22,6 +22,7 @@ 
     children_count: PyDict,
     target_rev: Revision,
     rev_info: PyObject,
+    multi_thread: bool,
 ) -> PyResult<PyDict> {
     let children_count = children_count
         .items(py)
@@ -42,20 +43,81 @@ 
         Ok((rev, p1, p2, opt_bytes))
     });
 
-    let mut combine_changeset_copies =
-        CombineChangesetCopies::new(children_count);
+    let path_copies = if !multi_thread {
+        let mut combine_changeset_copies =
+            CombineChangesetCopies::new(children_count);
+
+        for rev_info in revs_info {
+            let (rev, p1, p2, opt_bytes) = rev_info?;
+            let files = match &opt_bytes {
+                Some(bytes) => ChangedFiles::new(bytes.data(py)),
+                // Python None was extracted to Option::None,
+                // meaning there was no copy data.
+                None => ChangedFiles::new_empty(),
+            };
+
+            combine_changeset_copies.add_revision(rev, p1, p2, files)
+        }
+        combine_changeset_copies.finish(target_rev)
+    } else {
+        // Use a bounded channel to provide back-pressure:
+        // if the child thread is slower to process revisions than this thread
+        // is to gather data for them, an unbounded channel would keep
+        // growing and eat memory.
+        //
+        // TODO: tweak the bound?
+        let (rev_info_sender, rev_info_receiver) =
+            crossbeam_channel::bounded::<RevInfo>(1000);
 
-    for rev_info in revs_info {
-        let (rev, p1, p2, opt_bytes) = rev_info?;
-        let files = match &opt_bytes {
-            Some(bytes) => ChangedFiles::new(bytes.data(py)),
-            // value was presumably None, meaning they was no copy data.
-            None => ChangedFiles::new_empty(),
-        };
+        // Start a thread that does CPU-heavy processing in parallel with the
+        // loop below.
+        //
+        // If the parent thread panics, `rev_info_sender` will be dropped and
+        // “disconnected”. `rev_info_receiver` will be notified of this and
+        // exit its own loop.
+        let thread = std::thread::spawn(move || {
+            let mut combine_changeset_copies =
+                CombineChangesetCopies::new(children_count);
+            for (rev, p1, p2, opt_bytes) in rev_info_receiver {
+                let gil = Python::acquire_gil();
+                let py = gil.python();
+                let files = match &opt_bytes {
+                    Some(raw) => ChangedFiles::new(raw.data(py)),
+                    // Python None was extracted to Option::None,
+                    // meaning there was no copy data.
+                    None => ChangedFiles::new_empty(),
+                };
+                combine_changeset_copies.add_revision(rev, p1, p2, files)
+            }
+
+            combine_changeset_copies.finish(target_rev)
+        });
 
-        combine_changeset_copies.add_revision(rev, p1, p2, files)
-    }
-    let path_copies = combine_changeset_copies.finish(target_rev);
+        for rev_info in revs_info {
+            let (rev, p1, p2, opt_bytes) = rev_info?;
+
+            // We’d prefer to avoid the child thread calling into Python code,
+            // but this avoids a potential deadlock on the GIL if it does:
+            py.allow_threads(|| {
+                rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
+                    "combine_changeset_copies: channel is disconnected",
+                );
+            });
+        }
+        // We’d prefer to avoid the child thread calling into Python code,
+        // but this avoids a potential deadlock on the GIL if it does:
+        py.allow_threads(|| {
+            // Disconnect the channel to signal the child thread to stop:
+            // the `for … in rev_info_receiver` loop will end.
+            drop(rev_info_sender);
+
+            // Wait for the child thread to stop, and propagate any panic.
+            thread.join().unwrap_or_else(|panic_payload| {
+                std::panic::resume_unwind(panic_payload)
+            })
+        })
+    };
+
     let out = PyDict::new(py);
     for (dest, source) in path_copies.into_iter() {
         out.set_item(
@@ -84,7 +146,8 @@ 
                 revs: PyList,
                 children: PyDict,
                 target_rev: Revision,
-                rev_info: PyObject
+                rev_info: PyObject,
+                multi_thread: bool
             )
         ),
     )?;
diff --git a/rust/hg-cpython/Cargo.toml b/rust/hg-cpython/Cargo.toml
--- a/rust/hg-cpython/Cargo.toml
+++ b/rust/hg-cpython/Cargo.toml
@@ -22,6 +22,7 @@ 
 python3-bin = ["cpython/python3-sys"]
 
 [dependencies]
+crossbeam-channel = "0.4"
 hg-core = { path = "../hg-core"}
 libc = '*'
 log = "0.4.8"
diff --git a/rust/Cargo.lock b/rust/Cargo.lock
--- a/rust/Cargo.lock
+++ b/rust/Cargo.lock
@@ -306,6 +306,7 @@ 
 version = "0.1.0"
 dependencies = [
  "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "hg-core 0.1.0",
  "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/mercurial/copies.py b/mercurial/copies.py
--- a/mercurial/copies.py
+++ b/mercurial/copies.py
@@ -267,6 +267,7 @@ 
     revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()])
     roots = set()
     has_graph_roots = False
+    multi_thread = repo.ui.configbool(b'devel', b'copy-tracing.multi-thread')
 
     # iterate over `only(B, A)`
     for r in revs:
@@ -314,7 +315,7 @@ 
                     children_count[p] += 1
         revinfo = _revinfo_getter(repo, match)
         return _combine_changeset_copies(
-            revs, children_count, b.rev(), revinfo, match, isancestor
+            revs, children_count, b.rev(), revinfo, match, isancestor, multi_thread
         )
     else:
         # When not using side-data, we will process the edges "from" the parent.
@@ -339,7 +340,7 @@ 
 
 
 def _combine_changeset_copies(
-    revs, children_count, targetrev, revinfo, match, isancestor
+    revs, children_count, targetrev, revinfo, match, isancestor, multi_thread
 ):
     """combine the copies information for each item of iterrevs
 
@@ -356,7 +357,7 @@ 
 
     if rustmod is not None:
         final_copies = rustmod.combine_changeset_copies(
-            list(revs), children_count, targetrev, revinfo
+            list(revs), children_count, targetrev, revinfo, multi_thread
         )
     else:
         isancestor = cached_is_ancestor(isancestor)
diff --git a/mercurial/configitems.py b/mercurial/configitems.py
--- a/mercurial/configitems.py
+++ b/mercurial/configitems.py
@@ -687,6 +687,11 @@ 
 )
 coreconfigitem(
     b'devel',
+    b'copy-tracing.multi-thread',
+    default=True,
+)
+coreconfigitem(
+    b'devel',
     b'debug.extensions',
     default=False,
 )