@@ -300,61 +300,55 @@
}
/// Dispatch a single entry (file, folder, symlink...) found during `traverse`.
-/// If the entry is a folder that needs to be traversed, it will be pushed into
-/// `work`.
+/// If the entry is a folder that needs to be traversed, it will be handled
+/// in a separate thread.
+
fn handle_traversed_entry<'a>(
- dir_entry: &DirEntry,
- matcher: &(impl Matcher + Sync),
- root_dir: impl AsRef<Path>,
- dmap: &DirstateMap,
- filename: impl AsRef<HgPath>,
- old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
- ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
- dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ scope: &rayon::Scope<'a>,
+ files_sender: &'a crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+ matcher: &'a (impl Matcher + Sync),
+ root_dir: impl AsRef<Path> + Sync + Send + Copy + 'a,
+ dmap: &'a DirstateMap,
+ old_results: &'a FastHashMap<Cow<HgPath>, Dispatch>,
+ ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ dir_ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
options: StatusOptions,
-) -> IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> {
+ filename: HgPathBuf,
+ dir_entry: DirEntry,
+) -> IoResult<()> {
let file_type = dir_entry.file_type()?;
- let filename = filename.as_ref();
- let entry_option = dmap.get(filename);
+ let entry_option = dmap.get(&filename);
if file_type.is_dir() {
- // Do we need to traverse it?
- if !ignore_fn(&filename) || options.list_ignored {
- return traverse_dir(
- matcher,
- root_dir,
- dmap,
- filename.to_owned(),
- &old_results,
- ignore_fn,
- dir_ignore_fn,
- options,
- );
- }
- // Nested `if` until `rust-lang/rust#53668` is stable
- if let Some(entry) = entry_option {
- // Used to be a file, is now a folder
- if matcher.matches_everything() || matcher.matches(&filename) {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- dispatch_missing(entry.state),
- )]);
- }
- }
+ handle_traversed_dir(
+ scope,
+ files_sender,
+ matcher,
+ root_dir,
+ dmap,
+ old_results,
+ ignore_fn,
+ dir_ignore_fn,
+ options,
+ entry_option,
+ filename,
+ );
} else if file_type.is_file() || file_type.is_symlink() {
if let Some(entry) = entry_option {
if matcher.matches_everything() || matcher.matches(&filename) {
let metadata = dir_entry.metadata()?;
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- dispatch_found(
- &filename,
- *entry,
- HgMetadata::from_metadata(metadata),
- &dmap.copy_map,
- options,
- ),
- )]);
+ files_sender
+ .send(Ok((
+ filename.to_owned(),
+ dispatch_found(
+ &filename,
+ *entry,
+ HgMetadata::from_metadata(metadata),
+ &dmap.copy_map,
+ options,
+ ),
+ )))
+ .unwrap();
}
} else if (matcher.matches_everything() || matcher.matches(&filename))
&& !ignore_fn(&filename)
@@ -363,53 +357,96 @@
&& dir_ignore_fn(&filename)
{
if options.list_ignored {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- Dispatch::Ignored,
- )]);
+ files_sender
+ .send(Ok((filename.to_owned(), Dispatch::Ignored)))
+ .unwrap();
}
} else {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- Dispatch::Unknown,
- )]);
+ files_sender
+ .send(Ok((filename.to_owned(), Dispatch::Unknown)))
+ .unwrap();
}
} else if ignore_fn(&filename) && options.list_ignored {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- Dispatch::Ignored,
- )]);
+ files_sender
+ .send(Ok((filename.to_owned(), Dispatch::Ignored)))
+ .unwrap();
}
} else if let Some(entry) = entry_option {
// Used to be a file or a folder, now something else.
if matcher.matches_everything() || matcher.matches(&filename) {
- return Ok(vec![(
- Cow::Owned(filename.to_owned()),
- dispatch_missing(entry.state),
- )]);
+ files_sender
+ .send(Ok((filename.to_owned(), dispatch_missing(entry.state))))
+ .unwrap();
}
}
- return Ok(vec![]);
+
+ Ok(())
}
-/// Decides whether the directory needs to be listed, and if so dispatches its
-/// entries
+/// A directory was found in the filesystem and needs to be traversed
+fn handle_traversed_dir<'a>(
+ scope: &rayon::Scope<'a>,
+ files_sender: &'a crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+ matcher: &'a (impl Matcher + Sync),
+ root_dir: impl AsRef<Path> + Sync + Send + Copy + 'a,
+ dmap: &'a DirstateMap,
+ old_results: &'a FastHashMap<Cow<HgPath>, Dispatch>,
+ ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ dir_ignore_fn: &'a (impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ options: StatusOptions,
+ entry_option: Option<&'a DirstateEntry>,
+ directory: HgPathBuf,
+) {
+ scope.spawn(move |_| {
+ // Nested `if` until `rust-lang/rust#53668` is stable
+ if let Some(entry) = entry_option {
+ // Used to be a file, is now a folder
+ if matcher.matches_everything() || matcher.matches(&directory) {
+ files_sender
+ .send(Ok((
+ directory.to_owned(),
+ dispatch_missing(entry.state),
+ )))
+ .unwrap();
+ }
+ }
+ // Do we need to traverse it?
+ if !ignore_fn(&directory) || options.list_ignored {
+ traverse_dir(
+ files_sender,
+ matcher,
+ root_dir,
+ dmap,
+ directory,
+ &old_results,
+ ignore_fn,
+ dir_ignore_fn,
+ options,
+ )
+ .unwrap_or_else(|e| files_sender.send(Err(e)).unwrap())
+ }
+ });
+}
+
+/// Decides whether the directory needs to be listed, and if so handles the
+/// entries in a separate thread.
fn traverse_dir<'a>(
- matcher: &(impl Matcher + Sync),
- root_dir: impl AsRef<Path>,
- dmap: &DirstateMap,
- path: impl AsRef<HgPath>,
+ files_sender: &crossbeam::Sender<IoResult<(HgPathBuf, Dispatch)>>,
+ matcher: &'a (impl Matcher + Sync),
+ root_dir: impl AsRef<Path> + Sync + Send + Copy,
+ dmap: &'a DirstateMap,
+ directory: impl AsRef<HgPath>,
old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
options: StatusOptions,
-) -> IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> {
- let directory = path.as_ref();
+) -> IoResult<()> {
+ let directory = directory.as_ref();
if directory.as_bytes() == b".hg" {
- return Ok(vec![]);
+ return Ok(());
}
let visit_entries = match matcher.visit_children_set(directory) {
- VisitChildrenSet::Empty => return Ok(vec![]),
+ VisitChildrenSet::Empty => return Ok(()),
VisitChildrenSet::This | VisitChildrenSet::Recursive => None,
VisitChildrenSet::Set(set) => Some(set),
};
@@ -420,49 +457,108 @@
let entries = match list_directory(dir_path, skip_dot_hg) {
Err(e) => match e.kind() {
ErrorKind::NotFound | ErrorKind::PermissionDenied => {
- return Ok(vec![(
- Cow::Owned(directory.to_owned()),
- Dispatch::Bad(BadMatch::OsError(
- // Unwrapping here is OK because the error always
- // is a real os error
- e.raw_os_error().unwrap(),
- )),
- )]);
+ files_sender
+ .send(Ok((
+ directory.to_owned(),
+ Dispatch::Bad(BadMatch::OsError(
+ // Unwrapping here is OK because the error always
+ // is a real os error
+ e.raw_os_error().unwrap(),
+ )),
+ )))
+ .unwrap();
+ return Ok(());
}
_ => return Err(e),
},
Ok(entries) => entries,
};
- let mut new_results = vec![];
- for (filename, dir_entry) in entries {
- if let Some(ref set) = visit_entries {
- if !set.contains(filename.deref()) {
- continue;
+ rayon::scope(|scope| -> IoResult<()> {
+ for (filename, dir_entry) in entries {
+ if let Some(ref set) = visit_entries {
+ if !set.contains(filename.deref()) {
+ continue;
+ }
+ }
+ // TODO normalize
+ let filename = if directory.is_empty() {
+ filename.to_owned()
+ } else {
+ directory.join(&filename)
+ };
+
+ if !old_results.contains_key(filename.deref()) {
+ handle_traversed_entry(
+ scope,
+ files_sender,
+ matcher,
+ root_dir,
+ dmap,
+ old_results,
+ ignore_fn,
+ dir_ignore_fn,
+ options,
+ filename,
+ dir_entry,
+ )?;
}
}
- // TODO normalize
- let filename = if directory.is_empty() {
- filename.to_owned()
- } else {
- directory.join(&filename)
- };
+ Ok(())
+ })
+}
+
+/// Walk the working directory recursively to look for changes compared to the
+/// current `DirstateMap`.
+///
+/// This takes a mutable reference to the results to account for the `extend`
+/// in timings
+fn traverse<'a>(
+ matcher: &'a (impl Matcher + Sync),
+ root_dir: impl AsRef<Path> + Sync + Send + Copy,
+ dmap: &'a DirstateMap,
+ path: impl AsRef<HgPath>,
+ old_results: &FastHashMap<Cow<'a, HgPath>, Dispatch>,
+ ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ dir_ignore_fn: &(impl for<'r> Fn(&'r HgPath) -> bool + Sync),
+ options: StatusOptions,
+ results: &mut Vec<(Cow<'a, HgPath>, Dispatch)>,
+) -> IoResult<()> {
+ let root_dir = root_dir.as_ref();
+
+ // The traversal is done in parallel, so use a channel to gather entries.
+ // `crossbeam::Sender` is `Send`, while `mpsc::Sender` is not.
+ let (files_transmitter, files_receiver) = crossbeam::channel::unbounded();
- if !old_results.contains_key(filename.deref()) {
- new_results.extend(handle_traversed_entry(
- &dir_entry,
- matcher,
- root_dir.as_ref(),
- &dmap,
- &filename,
- old_results,
- ignore_fn,
- dir_ignore_fn,
- options,
- )?);
- }
- }
- Ok(new_results)
+ traverse_dir(
+ &files_transmitter,
+ matcher,
+ root_dir,
+ &dmap,
+ path,
+ &old_results,
+ &ignore_fn,
+ &dir_ignore_fn,
+ options,
+ )?;
+
+ // Disconnect the channel so the receiver stops waiting
+ drop(files_transmitter);
+
+ // TODO don't collect. Find a way of replicating the behavior of
+ // `itertools::process_results`, but for `rayon::ParallelIterator`
+ let new_results: IoResult<Vec<(Cow<'a, HgPath>, Dispatch)>> =
+ files_receiver
+ .into_iter()
+ .map(|item| {
+ let (f, d) = item?;
+ Ok((Cow::Owned(f), d))
+ })
+ .collect();
+
+ results.par_extend(new_results?);
+
+ Ok(())
}
/// Stat all entries in the `DirstateMap` and mark them for dispatch.
@@ -753,7 +849,7 @@
if options.list_ignored
|| options.list_unknown && !dir_ignore_fn(&dir)
{
- results.par_extend(traverse_dir(
+ traverse(
matcher,
root_dir,
&dmap,
@@ -762,7 +858,8 @@
&ignore_fn,
&dir_ignore_fn,
options,
- )?);
+ &mut results,
+ )?;
}
}
_ => unreachable!("There can only be directories in `work`"),
@@ -21,6 +21,7 @@
regex = "1.1.0"
twox-hash = "1.5.0"
same-file = "1.0.6"
+crossbeam = "0.7.3"
[dev-dependencies]
clap = "*"
@@ -98,6 +98,28 @@
]
[[package]]
+name = "crossbeam"
+version = "0.7.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "cfg-if 0.1.10 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "crossbeam-channel"
+version = "0.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "crossbeam-utils 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
name = "crossbeam-deque"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -192,6 +214,7 @@
"byteorder 1.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"cc 1.0.50 (registry+https://github.com/rust-lang/crates.io-index)",
"clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)",
"libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -235,6 +258,11 @@
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
+name = "maybe-uninit"
+version = "2.0.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
+[[package]]
name = "memchr"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -670,6 +698,8 @@
"checksum clap 2.33.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5067f5bb2d80ef5d68b4c87db81601f0b75bca627bc2ef76b141d7b846a3c6d9"
"checksum cloudabi 0.0.3 (registry+https://github.com/rust-lang/crates.io-index)" = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
"checksum cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)" = "bfaf3847ab963e40c4f6dd8d6be279bdf74007ae2413786a0dcbb28c52139a95"
+"checksum crossbeam 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)" = "69323bff1fb41c635347b8ead484a5ca6c3f11914d784170b158d8449ab07f8e"
+"checksum crossbeam-channel 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)" = "cced8691919c02aac3cb0a1bc2e9b73d89e832bf9a06fc579d4e71b68a2da061"
"checksum crossbeam-deque 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)" = "c3aa945d63861bfe624b55d153a39684da1e8c0bc8fba932f7ee3a3c16cea3ca"
"checksum crossbeam-epoch 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)" = "5064ebdbf05ce3cb95e45c8b086f72263f4166b29b97f6baff7ef7fe047b55ac"
"checksum crossbeam-queue 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c695eeca1e7173472a32221542ae469b3e9aac3a4fc81f7696bcad82029493db"
@@ -683,6 +713,7 @@
"checksum hex 0.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "023b39be39e3a2da62a94feb433e91e8bcd37676fbc8bea371daf52b7a769a3e"
"checksum lazy_static 1.4.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
"checksum libc 0.2.66 (registry+https://github.com/rust-lang/crates.io-index)" = "d515b1f41455adea1313a4a2ac8a8a477634fbae63cc6100e3aebb207ce61558"
+"checksum maybe-uninit 2.0.0 (registry+https://github.com/rust-lang/crates.io-index)" = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
"checksum memchr 2.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "3197e20c7edb283f87c071ddfc7a2cca8f8e0b888c242959846a6fce03c72223"
"checksum memmap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6585fd95e7bb50d6cc31e20d4cf9afb4e2ba16c5846fc76793f11218da9c475b"
"checksum memoffset 0.5.3 (registry+https://github.com/rust-lang/crates.io-index)" = "75189eb85871ea5c2e2c15abbdd541185f63b408415e5051f5cac122d8c774b9"