Patchwork D9686: copies-rust: send PyBytes values back be dropped ino the parent thread

login
register
mail settings
Submitter phabricator
Date Jan. 6, 2021, 2:12 p.m.
Message ID <differential-rev-PHID-DREV-o6fgled4lkhfri7zsw7d-req@mercurial-scm.org>
Download mbox | patch
Permalink /patch/47996/
State New
Headers show

Comments

phabricator - Jan. 6, 2021, 2:12 p.m.
SimonSapin created this revision.
Herald added a reviewer: hg-reviewers.
Herald added a subscriber: mercurial-patches.

REVISION SUMMARY
  … instead of acquiring the GIL in the Rust thread in the Drop impl
  
  This commit is based on the premise that crossbeam-channel
  with unbounded send and non-blocking receive is faster than
  a contended GIL, but that remains to be measured.

REPOSITORY
  rHG Mercurial

BRANCH
  default

REVISION DETAIL
  https://phab.mercurial-scm.org/D9686

AFFECTED FILES
  rust/hg-cpython/src/copy_tracing.rs

CHANGE DETAILS




To: SimonSapin, #hg-reviewers
Cc: mercurial-patches, mercurial-devel

Patch

diff --git a/rust/hg-cpython/src/copy_tracing.rs b/rust/hg-cpython/src/copy_tracing.rs
--- a/rust/hg-cpython/src/copy_tracing.rs
+++ b/rust/hg-cpython/src/copy_tracing.rs
@@ -1,6 +1,7 @@ 
 use cpython::ObjectProtocol;
 use cpython::PyBytes;
 use cpython::PyDict;
+use cpython::PyDrop;
 use cpython::PyList;
 use cpython::PyModule;
 use cpython::PyObject;
@@ -58,6 +59,10 @@ 
             // alive, and the returned slice borrows `self`.
             unsafe { &*self.data }
         }
+
+        pub fn unwrap(self) -> PyBytes {
+            self.keep_alive
+        }
     }
 }
 
@@ -93,7 +98,8 @@ 
             Ok((rev, p1, p2, opt_bytes))
         });
 
-    let path_copies = if !multi_thread {
+    let path_copies;
+    if !multi_thread {
         let mut combine_changeset_copies =
             CombineChangesetCopies::new(children_count);
 
@@ -108,7 +114,7 @@ 
 
             combine_changeset_copies.add_revision(rev, p1, p2, files)
         }
-        combine_changeset_copies.finish(target_rev)
+        path_copies = combine_changeset_copies.finish(target_rev)
     } else {
         // Use a bounded channel to provide back-pressure:
         // if the child thread is slower to process revisions than this thread
@@ -119,6 +125,13 @@ 
         let (rev_info_sender, rev_info_receiver) =
             crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000);
 
+        // This channel (going the other way around) however is unbounded.
+        // If they were both bounded, there might potentially be deadlocks
+        // where both channels are full and both threads are waiting on each
+        // other.
+        let (pybytes_sender, pybytes_receiver) =
+            crossbeam_channel::unbounded();
+
         // Start a thread that does CPU-heavy processing in parallel with the
         // loop below.
         //
@@ -135,10 +148,20 @@ 
                     // meaning there was no copy data.
                     None => ChangedFiles::new_empty(),
                 };
-                combine_changeset_copies.add_revision(rev, p1, p2, files)
+                combine_changeset_copies.add_revision(rev, p1, p2, files);
 
-                // The GIL is (still) implicitly acquired here through
-                // `impl Drop for PyBytes`.
+                // Send `PyBytes` back to the parent thread so the parent
+                // thread can drop it. Otherwise the GIL would be implicitly
+                // acquired here through `impl Drop for PyBytes`.
+                if let Some(bytes) = opt_bytes {
+                    if let Err(_) = pybytes_sender.send(bytes.unwrap()) {
+                        // The channel is disconnected, meaning the parent
+                        // thread panicked or returned
+                        // early through
+                        // `?` to propagate a Python exception.
+                        break;
+                    }
+                }
             }
 
             combine_changeset_copies.finish(target_rev)
@@ -155,10 +178,15 @@ 
                     "combine_changeset_copies: channel is disconnected",
                 );
             });
+
+            // Drop anything in the channel, without blocking
+            for pybytes in pybytes_receiver.try_iter() {
+                pybytes.release_ref(py)
+            }
         }
         // We’d prefer to avoid the child thread calling into Python code,
         // but this avoids a potential deadlock on the GIL if it does:
-        py.allow_threads(|| {
+        path_copies = py.allow_threads(|| {
             // Disconnect the channel to signal the child thread to stop:
             // the `for … in rev_info_receiver` loop will end.
             drop(rev_info_sender);
@@ -167,7 +195,12 @@ 
             thread.join().unwrap_or_else(|panic_payload| {
                 std::panic::resume_unwind(panic_payload)
             })
-        })
+        });
+
+        // Drop anything left in the channel
+        for pybytes in pybytes_receiver.iter() {
+            pybytes.release_ref(py)
+        }
     };
 
     let out = PyDict::new(py);