Patchwork D8251: rust-status: traverse working directory in parallel

login
register
mail settings
Submitter phabricator
Date March 11, 2020, 3:52 p.m.
Message ID <1d974e2efe7aa81a7772d8bf7f4a5411@localhost.localdomain>
Download mbox | patch
Permalink /patch/45713/
State Not Applicable
Headers show

Comments

phabricator - March 11, 2020, 3:52 p.m.
Alphare updated this revision to Diff 20708.

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D8251?vs=20565&id=20708

BRANCH
  default

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8251/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8251

AFFECTED FILES
  rust/Cargo.lock
  rust/hg-core/Cargo.toml
  rust/hg-core/src/dirstate/status.rs

CHANGE DETAILS




To: Alphare, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/rust/hg-core/src/dirstate/status.rs b/rust/hg-core/src/dirstate/status.rs
--- a/rust/hg-core/src/dirstate/status.rs
+++ b/rust/hg-core/src/dirstate/status.rs
@@ -300,61 +300,55 @@ 
 }
 
 /// Dispatch a single entry (file, folder, symlink...) found during `traverse`.
-/// If the entry is a folder that needs to be traversed, it will be pushed into
-/// `work`.
+/// If the entry is a folder that needs to be traversed, it will be handled
+/// in a separate thread.
+
 fn handle_traversed_entry<'a>(
-    dir_entry: &DirEntry,
-    matcher: &(impl Matcher + Sync),
-    root_dir: impl AsRef<Path>,
-    dmap: &DirstateMap,
-    filename: impl AsRef<HgPath>,
-    old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
-    ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
-    dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+    scope: &rayon::Scope<'a>,
+    files_sender: &'a crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+    matcher: &'a (impl Matcher + Sync),
+    root_dir: impl AsRef<Path> + Sync + Send + Copy + 'a,
+    dmap: &'a DirstateMap,
+    old_results: &'a FastHashMap<Cow<HgPath>, Dispatch>,
+    ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+    dir_ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
     options: StatusOptions,
-) -> IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> {
+    filename: HgPathBuf,
+    dir_entry: DirEntry,
+) -> IoResult<()> {
     let file_type = dir_entry.file_type()?;
-    let filename = filename.as_ref();
-    let entry_option = dmap.get(filename);
+    let entry_option = dmap.get(&filename);
 
     if file_type.is_dir() {
-        // Do we need to traverse it?
-        if !ignore_fn(&filename) || options.list_ignored {
-            return traverse_dir(
-                matcher,
-                root_dir,
-                dmap,
-                filename.to_owned(),
-                &old_results,
-                ignore_fn,
-                dir_ignore_fn,
-                options,
-            );
-        }
-        // Nested `if` until `rust-lang/rust#53668` is stable
-        if let Some(entry) = entry_option {
-            // Used to be a file, is now a folder
-            if matcher.matches_everything() || matcher.matches(&filename) {
-                return Ok(vec![(
-                    Cow::Owned(filename.to_owned()),
-                    dispatch_missing(entry.state),
-                )]);
-            }
-        }
+        handle_traversed_dir(
+            scope,
+            files_sender,
+            matcher,
+            root_dir,
+            dmap,
+            old_results,
+            ignore_fn,
+            dir_ignore_fn,
+            options,
+            entry_option,
+            filename,
+        );
     } else if file_type.is_file() || file_type.is_symlink() {
         if let Some(entry) = entry_option {
             if matcher.matches_everything() || matcher.matches(&filename) {
                 let metadata = dir_entry.metadata()?;
-                return Ok(vec![(
-                    Cow::Owned(filename.to_owned()),
-                    dispatch_found(
-                        &filename,
-                        *entry,
-                        HgMetadata::from_metadata(metadata),
-                        &dmap.copy_map,
-                        options,
-                    ),
-                )]);
+                files_sender
+                    .send(Ok((
+                        filename.to_owned(),
+                        dispatch_found(
+                            &filename,
+                            *entry,
+                            HgMetadata::from_metadata(metadata),
+                            &dmap.copy_map,
+                            options,
+                        ),
+                    )))
+                    .unwrap();
             }
         } else if (matcher.matches_everything() || matcher.matches(&filename))
             && !ignore_fn(&filename)
@@ -363,53 +357,96 @@ 
                 && dir_ignore_fn(&filename)
             {
                 if options.list_ignored {
-                    return Ok(vec![(
-                        Cow::Owned(filename.to_owned()),
-                        Dispatch::Ignored,
-                    )]);
+                    files_sender
+                        .send(Ok((filename.to_owned(), Dispatch::Ignored)))
+                        .unwrap();
                 }
             } else {
-                return Ok(vec![(
-                    Cow::Owned(filename.to_owned()),
-                    Dispatch::Unknown,
-                )]);
+                files_sender
+                    .send(Ok((filename.to_owned(), Dispatch::Unknown)))
+                    .unwrap();
             }
         } else if ignore_fn(&filename) && options.list_ignored {
-            return Ok(vec![(
-                Cow::Owned(filename.to_owned()),
-                Dispatch::Ignored,
-            )]);
+            files_sender
+                .send(Ok((filename.to_owned(), Dispatch::Ignored)))
+                .unwrap();
         }
     } else if let Some(entry) = entry_option {
         // Used to be a file or a folder, now something else.
         if matcher.matches_everything() || matcher.matches(&filename) {
-            return Ok(vec![(
-                Cow::Owned(filename.to_owned()),
-                dispatch_missing(entry.state),
-            )]);
+            files_sender
+                .send(Ok((filename.to_owned(), dispatch_missing(entry.state))))
+                .unwrap();
         }
     }
-    return Ok(vec![]);
+
+    Ok(())
 }
 
-/// Decides whether the directory needs to be listed, and if so dispatches its
-/// entries
+/// A directory was found in the filesystem and needs to be traversed
+fn handle_traversed_dir<'a>(
+    scope: &rayon::Scope<'a>,
+    files_sender: &'a crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+    matcher: &'a (impl Matcher + Sync),
+    root_dir: impl AsRef<Path> + Sync + Send + Copy + 'a,
+    dmap: &'a DirstateMap,
+    old_results: &'a FastHashMap<Cow<HgPath>, Dispatch>,
+    ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+    dir_ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+    options: StatusOptions,
+    entry_option: Option<&'a DirstateEntry>,
+    directory: HgPathBuf,
+) {
+    scope.spawn(move |_| {
+        // Nested `if` until `rust-lang/rust#53668` is stable
+        if let Some(entry) = entry_option {
+            // Used to be a file, is now a folder
+            if matcher.matches_everything() || matcher.matches(&directory) {
+                files_sender
+                    .send(Ok((
+                        directory.to_owned(),
+                        dispatch_missing(entry.state),
+                    )))
+                    .unwrap();
+            }
+        }
+        // Do we need to traverse it?
+        if !ignore_fn(&directory) || options.list_ignored {
+            traverse_dir(
+                files_sender,
+                matcher,
+                root_dir,
+                dmap,
+                directory,
+                &old_results,
+                ignore_fn,
+                dir_ignore_fn,
+                options,
+            )
+            .unwrap_or_else(|e| files_sender.send(Err(e)).unwrap())
+        }
+    });
+}
+
+/// Decides whether the directory needs to be listed, and if so handles the
+/// entries in a separate thread.
 fn traverse_dir<'a>(
-    matcher: &(impl Matcher + Sync),
-    root_dir: impl AsRef<Path>,
-    dmap: &DirstateMap,
-    path: impl AsRef<HgPath>,
+    files_sender: &crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+    matcher: &'a (impl Matcher + Sync),
+    root_dir: impl AsRef<Path> + Sync + Send + Copy,
+    dmap: &'a DirstateMap,
+    directory: impl AsRef<HgPath>,
     old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
     ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
     dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
     options: StatusOptions,
-) -> IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> {
-    let directory = path.as_ref();
+) -> IoResult<()> {
+    let directory = directory.as_ref();
     if directory.as_bytes() == b".hg" {
-        return Ok(vec![]);
+        return Ok(());
     }
     let visit_entries = match matcher.visit_children_set(directory) {
-        VisitChildrenSet::Empty => return Ok(vec![]),
+        VisitChildrenSet::Empty => return Ok(()),
         VisitChildrenSet::This | VisitChildrenSet::Recursive => None,
         VisitChildrenSet::Set(set) => Some(set),
     };
@@ -420,49 +457,108 @@ 
     let entries = match list_directory(dir_path, skip_dot_hg) {
         Err(e) => match e.kind() {
             ErrorKind::NotFound | ErrorKind::PermissionDenied => {
-                return Ok(vec![(
-                    Cow::Owned(directory.to_owned()),
-                    Dispatch::Bad(BadMatch::OsError(
-                        // Unwrapping here is OK because the error always
-                        // is a real os error
-                        e.raw_os_error().unwrap(),
-                    )),
-                )]);
+                files_sender
+                    .send(Ok((
+                        directory.to_owned(),
+                        Dispatch::Bad(BadMatch::OsError(
+                            // Unwrapping here is OK because the error always
+                            // is a real os error
+                            e.raw_os_error().unwrap(),
+                        )),
+                    )))
+                    .unwrap();
+                return Ok(());
             }
             _ => return Err(e),
         },
         Ok(entries) => entries,
     };
 
-    let mut new_results = vec![];
-    for (filename, dir_entry) in entries {
-        if let Some(ref set) = visit_entries {
-            if !set.contains(filename.deref()) {
-                continue;
+    rayon::scope(|scope| -> IoResult<()> {
+        for (filename, dir_entry) in entries {
+            if let Some(ref set) = visit_entries {
+                if !set.contains(filename.deref()) {
+                    continue;
+                }
+            }
+            // TODO normalize
+            let filename = if directory.is_empty() {
+                filename.to_owned()
+            } else {
+                directory.join(&filename)
+            };
+
+            if !old_results.contains_key(filename.deref()) {
+                handle_traversed_entry(
+                    scope,
+                    files_sender,
+                    matcher,
+                    root_dir,
+                    dmap,
+                    old_results,
+                    ignore_fn,
+                    dir_ignore_fn,
+                    options,
+                    filename,
+                    dir_entry,
+                )?;
             }
         }
-        // TODO normalize
-        let filename = if directory.is_empty() {
-            filename.to_owned()
-        } else {
-            directory.join(&filename)
-        };
+        Ok(())
+    })
+}
+
+/// Walk the working directory recursively to look for changes compared to the
+/// current `DirstateMap`.
+///
+/// This takes a mutable reference to the results to account for the `extend`
+/// in timings
+fn traverse<'a>(
+    matcher: &'a (impl Matcher + Sync),
+    root_dir: impl AsRef<Path> + Sync + Send + Copy,
+    dmap: &'a DirstateMap,
+    path: impl AsRef<HgPath>,
+    old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
+    ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+    dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+    options: StatusOptions,
+    results: &mut Vec<(Cow<'a, HgPath>, Dispatch)>,
+) -> IoResult<()> {
+    let root_dir = root_dir.as_ref();
+
+    // The traversal is done in parallel, so use a channel to gather entries.
+    // `crossbeam::Sender` is `Send`, while `mpsc::Sender` is not.
+    let (files_transmitter, files_receiver) = crossbeam::channel::unbounded();
 
-        if !old_results.contains_key(filename.deref()) {
-            new_results.extend(handle_traversed_entry(
-                &dir_entry,
-                matcher,
-                root_dir.as_ref(),
-                &dmap,
-                &filename,
-                old_results,
-                ignore_fn,
-                dir_ignore_fn,
-                options,
-            )?);
-        }
-    }
-    Ok(new_results)
+    traverse_dir(
+        &files_transmitter,
+        matcher,
+        root_dir,
+        &dmap,
+        path,
+        &old_results,
+        &ignore_fn,
+        &dir_ignore_fn,
+        options,
+    )?;
+
+    // Disconnect the channel so the receiver stops waiting
+    drop(files_transmitter);
+
+    // TODO don't collect. Find a way of replicating the behavior of
+    // `itertools::process_results`, but for `rayon::ParallelIterator`
+    let new_results: IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> =
+        files_receiver
+            .into_iter()
+            .map(|item| {
+                let (f, d) = item?;
+                Ok((Cow::Owned(f), d))
+            })
+            .collect();
+
+    results.par_extend(new_results?);
+
+    Ok(())
 }
 
 /// Stat all entries in the `DirstateMap` and mark them for dispatch.
@@ -753,7 +849,7 @@ 
                     if options.list_ignored
                         || options.list_unknown && !dir_ignore_fn(&dir)
                     {
-                        results.par_extend(traverse_dir(
+                        traverse(
                             matcher,
                             root_dir,
                             &dmap,
@@ -762,7 +858,8 @@ 
                             &ignore_fn,
                             &dir_ignore_fn,
                             options,
-                        )?);
+                            &mut results,
+                        )?;
                     }
                 }
                 _ => unreachable!("There can only be directories in `work`"),
diff --git a/rust/hg-core/Cargo.toml b/rust/hg-core/Cargo.toml
--- a/rust/hg-core/Cargo.toml
+++ b/rust/hg-core/Cargo.toml
@@ -21,6 +21,7 @@ 
 regex = "1.1.0"
 twox-hash = "1.5.0"
 same-file = "1.0.6"
+crossbeam = "0.7.3"
 
 [dev-dependencies]
 clap = "*"
diff --git a/rust/Cargo.lock b/rust/Cargo.lock
--- a/rust/Cargo.lock
+++ b/rust/Cargo.lock
@@ -98,6 +98,28 @@ 
 ]
 
 [[package]]
+name = "crossbeam"
+version = "0.7.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-channel"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
 name = "crossbeam-deque"
 version = "0.7.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -192,6 +214,7 @@ 
  "byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
  "cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
  "clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
  "hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
  "libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -227,6 +250,11 @@ 
 source = "registry+https://github.com/rust-lang/crates.io-index"
 
 [[package]]
+name = "maybe-uninit"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
 name = "memchr"
 version = "2.3.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -662,6 +690,8 @@ 
 "checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
 "checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
 "checksum cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bfaf3847ab963e40c4f6dd8d6be279bdf74007ae2413786a0dcbb28c52139a95"
+"checksum crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
+"checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
 "checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca"
 "checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac"
 "checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db"
@@ -675,6 +705,7 @@ 
 "checksum hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "023b39be39e3a2da62a94feb433e91e8bcd37676fbc8bea371daf52b7a769a3e"
 "checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
 "checksum libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)" = "d515b1f41455adea1313a4a2ac8a8a477634fbae63cc6100e3aebb207ce61558"
+"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
 "checksum memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3197e20c7edb283f87c071ddfc7a2cca8f8e0b888c242959846a6fce03c72223"
 "checksum memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
 "checksum memoffset 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "75189eb85871ea5c2e2c15abbdd541185f63b408415e5051f5cac122d8c774b9"