Patchwork D8360: rust-chg: spawn server process if not running

login
register
mail settings
Submitter phabricator
Date April 2, 2020, 11:12 a.m.
Message ID <differential-rev-PHID-DREV-cknzlus5d2oecrbqmfx5-req@mercurial-scm.org>
Download mbox | patch
Permalink /patch/45992/
State Superseded
Headers show

Comments

phabricator - April 2, 2020, 11:12 a.m.
yuja created this revision.
Herald added a subscriber: mercurial-devel.
Herald added a reviewer: hg-reviewers.

REVISION SUMMARY
  This is the minimal reimplementation of gethgcmd(), execcmdserver(),
  retryconnectcmdserver(), and connectcmdserver() in chg.c.
  
  No config validation is implemented yet. And some Py3 workarounds would
  be missing as this is the code I wrote in 2018.

REPOSITORY
  rHG Mercurial

BRANCH
  default

REVISION DETAIL
  https://phab.mercurial-scm.org/D8360

AFFECTED FILES
  rust/chg/src/lib.rs
  rust/chg/src/locator.rs
  rust/chg/src/main.rs

CHANGE DETAILS




To: yuja, #hg-reviewers
Cc: mercurial-devel
Raphaël Gomès - April 2, 2020, 12:26 p.m.
Aside from the outdated futures and Rust edition that Yuya already said 
are temporary, this looks good!

On 4/2/20 1:12 PM, yuja (Yuya Nishihara) wrote:
> yuja created this revision.
> Herald added a subscriber: mercurial-devel.
> Herald added a reviewer: hg-reviewers.
>
> REVISION SUMMARY
>    This is the minimal reimplementation of gethgcmd(), execcmdserver(),
>    retryconnectcmdserver(), and connectcmdserver() in chg.c.
>    
>    No config validation is implemented yet. And some Py3 workarounds would
>    be missing as this is the code I wrote in 2018.
>
> REPOSITORY
>    rHG Mercurial
>
> BRANCH
>    default
>
> REVISION DETAIL
>    https://phab.mercurial-scm.org/D8360
>
> AFFECTED FILES
>    rust/chg/src/lib.rs
>    rust/chg/src/locator.rs
>    rust/chg/src/main.rs
>
> CHANGE DETAILS
>
> diff --git a/rust/chg/src/main.rs b/rust/chg/src/main.rs
> --- a/rust/chg/src/main.rs
> +++ b/rust/chg/src/main.rs
> @@ -9,7 +9,7 @@
>   extern crate tokio;
>   extern crate tokio_hglib;
>   
> -use chg::locator;
> +use chg::locator::Locator;
>   use chg::procutil;
>   use chg::{ChgClientExt, ChgUiHandler};
>   use futures::sync::oneshot;
> @@ -18,7 +18,6 @@
>   use std::process;
>   use std::time::Instant;
>   use tokio::prelude::*;
> -use tokio_hglib::UnixClient;
>   
>   struct DebugLogger {
>       start: Instant,
> @@ -64,6 +63,8 @@
>           log::set_max_level(log::LevelFilter::Debug);
>       }
>   
> +    // TODO: add loop detection by $CHGINTERNALMARK
> +
>       let code = run().unwrap_or_else(|err| {
>           writeln!(io::stderr(), "chg: abort: {}", err).unwrap_or(());
>           255
> @@ -73,11 +74,12 @@
>   
>   fn run() -> io::Result<i32> {
>       let current_dir = env::current_dir()?;
> -    let sock_path = locator::prepare_server_socket_path()?;
> +    let loc = Locator::prepare_from_env()?;
>       let handler = ChgUiHandler::new();
>       let (result_tx, result_rx) = oneshot::channel();
> -    let fut = UnixClient::connect(sock_path)
> -        .and_then(|client| client.set_current_dir(current_dir))
> +    let fut = loc
> +        .connect()
> +        .and_then(|(_, client)| client.set_current_dir(current_dir))
>           .and_then(|client| client.attach_io(io::stdin(), io::stdout(), io::stderr()))
>           .and_then(|client| {
>               let pid = client.server_spec().process_id.unwrap();
> diff --git a/rust/chg/src/locator.rs b/rust/chg/src/locator.rs
> --- a/rust/chg/src/locator.rs
> +++ b/rust/chg/src/locator.rs
> @@ -5,6 +5,7 @@
>   
>   //! Utility for locating command-server process.
>   
> +use futures::future::{self, Either, Loop};
>   use std::env;
>   use std::ffi::{OsStr, OsString};
>   use std::fs::{self, DirBuilder};
> @@ -12,8 +13,12 @@
>   use std::os::unix::ffi::{OsStrExt, OsStringExt};
>   use std::os::unix::fs::{DirBuilderExt, MetadataExt};
>   use std::path::{Path, PathBuf};
> -use std::process;
> +use std::process::{self, Command};
>   use std::time::Duration;
> +use tokio::prelude::*;
> +use tokio_hglib::UnixClient;
> +use tokio_process::{Child, CommandExt};
> +use tokio_timer;
>   
>   use super::procutil;
>   
> @@ -52,13 +57,113 @@
>           buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
>           OsString::from_vec(buf).into()
>       }
> +
> +    /// Connects to the server.
> +    ///
> +    /// The server process will be spawned if not running.
> +    pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
> +        self.try_connect()
> +    }
> +
> +    /// Tries to connect to the existing server, or spawns new if not running.
> +    fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
> +        debug!("try connect to {}", self.base_sock_path.display());
> +        UnixClient::connect(self.base_sock_path.clone()).then(|res| match res {
> +            Ok(client) => Either::A(future::ok((self, client))),
> +            Err(_) => Either::B(self.spawn_connect()),
> +        })
> +    }
> +
> +    /// Spawns new server process and connects to it.
> +    ///
> +    /// The server will be spawned at the current working directory, then
> +    /// chdir to "/", so that the server will load configs from the target
> +    /// repository.
> +    fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
> +        let sock_path = self.temp_sock_path();
> +        debug!("start cmdserver at {}", sock_path.display());
> +        Command::new(&self.hg_command)
> +            .arg("serve")
> +            .arg("--cmdserver")
> +            .arg("chgunix")
> +            .arg("--address")
> +            .arg(&sock_path)
> +            .arg("--daemon-postexec")
> +            .arg("chdir:/")
> +            .current_dir(&self.current_dir)
> +            .env_clear()
> +            .envs(self.env_vars.iter().cloned())
> +            .env("CHGINTERNALMARK", "")
> +            .spawn_async()
> +            .into_future()
> +            .and_then(|server| self.connect_spawned(server, sock_path))
> +            .and_then(|(loc, client, sock_path)| {
> +                debug!(
> +                    "rename {} to {}",
> +                    sock_path.display(),
> +                    loc.base_sock_path.display()
> +                );
> +                fs::rename(&sock_path, &loc.base_sock_path)?;
> +                Ok((loc, client))
> +            })
> +    }
> +
> +    /// Tries to connect to the just spawned server repeatedly until timeout
> +    /// exceeded.
> +    fn connect_spawned(
> +        self,
> +        server: Child,
> +        sock_path: PathBuf,
> +    ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
> +        debug!("try connect to {} repeatedly", sock_path.display());
> +        let connect = future::loop_fn(sock_path, |sock_path| {
> +            UnixClient::connect(sock_path.clone()).then(|res| {
> +                match res {
> +                    Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
> +                    Err(_) => {

Maybe this could be later implemented with a Waker instead of 
delay-based polling with a futures 0.3 rewrite?

> +                        // try again with slight delay
> +                        let fut = tokio_timer::sleep(Duration::from_millis(10))
> +                            .map(|()| Loop::Continue(sock_path))
> +                            .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
> +                        Either::B(fut)
> +                    }
> +                }
> +            })
> +        });
> +
> +        // waits for either connection established or server failed to start
> +        connect
> +            .select2(server)
> +            .map_err(|res| res.split().0)
> +            .timeout(self.timeout)
> +            .map_err(|err| {
> +                err.into_inner().unwrap_or_else(|| {
> +                    io::Error::new(
> +                        io::ErrorKind::TimedOut,
> +                        "timed out while connecting to server",
> +                    )
> +                })
> +            })
> +            .and_then(|res| {
> +                match res {
> +                    Either::A(((client, sock_path), server)) => {
> +                        server.forget(); // continue to run in background
> +                        Ok((self, client, sock_path))
> +                    }
> +                    Either::B((st, _)) => Err(io::Error::new(
> +                        io::ErrorKind::Other,
> +                        format!("server exited too early: {}", st),
> +                    )),
> +                }
> +            })
> +    }
>   }
>   
>   /// Determines the server socket to connect to.
>   ///
>   /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
>   /// as necessary.
> -pub fn prepare_server_socket_path() -> io::Result<PathBuf> {
> +fn prepare_server_socket_path() -> io::Result<PathBuf> {
>       if let Some(s) = env::var_os("CHGSOCKNAME") {
>           Ok(PathBuf::from(s))
>       } else {
> diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs
> --- a/rust/chg/src/lib.rs
> +++ b/rust/chg/src/lib.rs
> @@ -7,9 +7,12 @@
>   #[macro_use]
>   extern crate futures;
>   extern crate libc;
> +#[macro_use]
> +extern crate log;
>   extern crate tokio;
>   extern crate tokio_hglib;
>   extern crate tokio_process;
> +extern crate tokio_timer;
>   
>   mod attachio;
>   mod clientext;
>
>
>
> To: yuja, #hg-reviewers
> Cc: mercurial-devel
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
phabricator - April 3, 2020, 12:20 p.m.
Alphare added a comment.
Alphare accepted this revision.


  Aside from the outdated futures and Rust edition that Yuya already said are temporary, this looks good!

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8360/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8360

To: yuja, #hg-reviewers, Alphare
Cc: Alphare, mercurial-devel

Patch

diff --git a/rust/chg/src/main.rs b/rust/chg/src/main.rs
--- a/rust/chg/src/main.rs
+++ b/rust/chg/src/main.rs
@@ -9,7 +9,7 @@ 
 extern crate tokio;
 extern crate tokio_hglib;
 
-use chg::locator;
+use chg::locator::Locator;
 use chg::procutil;
 use chg::{ChgClientExt, ChgUiHandler};
 use futures::sync::oneshot;
@@ -18,7 +18,6 @@ 
 use std::process;
 use std::time::Instant;
 use tokio::prelude::*;
-use tokio_hglib::UnixClient;
 
 struct DebugLogger {
     start: Instant,
@@ -64,6 +63,8 @@ 
         log::set_max_level(log::LevelFilter::Debug);
     }
 
+    // TODO: add loop detection by $CHGINTERNALMARK
+
     let code = run().unwrap_or_else(|err| {
         writeln!(io::stderr(), "chg: abort: {}", err).unwrap_or(());
         255
@@ -73,11 +74,12 @@ 
 
 fn run() -> io::Result<i32> {
     let current_dir = env::current_dir()?;
-    let sock_path = locator::prepare_server_socket_path()?;
+    let loc = Locator::prepare_from_env()?;
     let handler = ChgUiHandler::new();
     let (result_tx, result_rx) = oneshot::channel();
-    let fut = UnixClient::connect(sock_path)
-        .and_then(|client| client.set_current_dir(current_dir))
+    let fut = loc
+        .connect()
+        .and_then(|(_, client)| client.set_current_dir(current_dir))
         .and_then(|client| client.attach_io(io::stdin(), io::stdout(), io::stderr()))
         .and_then(|client| {
             let pid = client.server_spec().process_id.unwrap();
diff --git a/rust/chg/src/locator.rs b/rust/chg/src/locator.rs
--- a/rust/chg/src/locator.rs
+++ b/rust/chg/src/locator.rs
@@ -5,6 +5,7 @@ 
 
 //! Utility for locating command-server process.
 
+use futures::future::{self, Either, Loop};
 use std::env;
 use std::ffi::{OsStr, OsString};
 use std::fs::{self, DirBuilder};
@@ -12,8 +13,12 @@ 
 use std::os::unix::ffi::{OsStrExt, OsStringExt};
 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
 use std::path::{Path, PathBuf};
-use std::process;
+use std::process::{self, Command};
 use std::time::Duration;
+use tokio::prelude::*;
+use tokio_hglib::UnixClient;
+use tokio_process::{Child, CommandExt};
+use tokio_timer;
 
 use super::procutil;
 
@@ -52,13 +57,113 @@ 
         buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
         OsString::from_vec(buf).into()
     }
+
+    /// Connects to the server.
+    ///
+    /// The server process will be spawned if not running.
+    pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
+        self.try_connect()
+    }
+
+    /// Tries to connect to the existing server, or spawns new if not running.
+    fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
+        debug!("try connect to {}", self.base_sock_path.display());
+        UnixClient::connect(self.base_sock_path.clone()).then(|res| match res {
+            Ok(client) => Either::A(future::ok((self, client))),
+            Err(_) => Either::B(self.spawn_connect()),
+        })
+    }
+
+    /// Spawns new server process and connects to it.
+    ///
+    /// The server will be spawned at the current working directory, then
+    /// chdir to "/", so that the server will load configs from the target
+    /// repository.
+    fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
+        let sock_path = self.temp_sock_path();
+        debug!("start cmdserver at {}", sock_path.display());
+        Command::new(&self.hg_command)
+            .arg("serve")
+            .arg("--cmdserver")
+            .arg("chgunix")
+            .arg("--address")
+            .arg(&sock_path)
+            .arg("--daemon-postexec")
+            .arg("chdir:/")
+            .current_dir(&self.current_dir)
+            .env_clear()
+            .envs(self.env_vars.iter().cloned())
+            .env("CHGINTERNALMARK", "")
+            .spawn_async()
+            .into_future()
+            .and_then(|server| self.connect_spawned(server, sock_path))
+            .and_then(|(loc, client, sock_path)| {
+                debug!(
+                    "rename {} to {}",
+                    sock_path.display(),
+                    loc.base_sock_path.display()
+                );
+                fs::rename(&sock_path, &loc.base_sock_path)?;
+                Ok((loc, client))
+            })
+    }
+
+    /// Tries to connect to the just spawned server repeatedly until timeout
+    /// exceeded.
+    fn connect_spawned(
+        self,
+        server: Child,
+        sock_path: PathBuf,
+    ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
+        debug!("try connect to {} repeatedly", sock_path.display());
+        let connect = future::loop_fn(sock_path, |sock_path| {
+            UnixClient::connect(sock_path.clone()).then(|res| {
+                match res {
+                    Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
+                    Err(_) => {
+                        // try again with slight delay
+                        let fut = tokio_timer::sleep(Duration::from_millis(10))
+                            .map(|()| Loop::Continue(sock_path))
+                            .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
+                        Either::B(fut)
+                    }
+                }
+            })
+        });
+
+        // waits for either connection established or server failed to start
+        connect
+            .select2(server)
+            .map_err(|res| res.split().0)
+            .timeout(self.timeout)
+            .map_err(|err| {
+                err.into_inner().unwrap_or_else(|| {
+                    io::Error::new(
+                        io::ErrorKind::TimedOut,
+                        "timed out while connecting to server",
+                    )
+                })
+            })
+            .and_then(|res| {
+                match res {
+                    Either::A(((client, sock_path), server)) => {
+                        server.forget(); // continue to run in background
+                        Ok((self, client, sock_path))
+                    }
+                    Either::B((st, _)) => Err(io::Error::new(
+                        io::ErrorKind::Other,
+                        format!("server exited too early: {}", st),
+                    )),
+                }
+            })
+    }
 }
 
 /// Determines the server socket to connect to.
 ///
 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
 /// as necessary.
-pub fn prepare_server_socket_path() -> io::Result<PathBuf> {
+fn prepare_server_socket_path() -> io::Result<PathBuf> {
     if let Some(s) = env::var_os("CHGSOCKNAME") {
         Ok(PathBuf::from(s))
     } else {
diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs
--- a/rust/chg/src/lib.rs
+++ b/rust/chg/src/lib.rs
@@ -7,9 +7,12 @@ 
 #[macro_use]
 extern crate futures;
 extern crate libc;
+#[macro_use]
+extern crate log;
 extern crate tokio;
 extern crate tokio_hglib;
 extern crate tokio_process;
+extern crate tokio_timer;
 
 mod attachio;
 mod clientext;