Patchwork D8445: rust-chg: reimplement run_command operation as async function

login
register
mail settings
Submitter phabricator
Date April 23, 2020, 6:03 p.m.
Message ID <f8e823cee5f17b2efb912e5df850011a@localhost.localdomain>
Download mbox | patch
Permalink /patch/46216/
State Not Applicable
Headers show

Comments

phabricator - April 23, 2020, 6:03 p.m.
Herald added a subscriber: mercurial-patches.
Closed by commit rHG94cace4b80ea: rust-chg: reimplement run_command operation as async function (authored by yuja).
This revision was automatically updated to reflect the committed changes.
This revision was not accepted when it landed; it landed in state "Needs Review".

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D8445?vs=21122&id=21189

CHANGES SINCE LAST ACTION
  https://phab.mercurial-scm.org/D8445/new/

REVISION DETAIL
  https://phab.mercurial-scm.org/D8445

AFFECTED FILES
  rust/chg/src/lib.rs
  rust/chg/src/runcommand.rs

CHANGE DETAILS




To: yuja, #hg-reviewers, Alphare
Cc: mercurial-patches, mercurial-devel

Patch

diff --git a/rust/chg/src/runcommand.rs b/rust/chg/src/runcommand.rs
--- a/rust/chg/src/runcommand.rs
+++ b/rust/chg/src/runcommand.rs
@@ -6,164 +6,56 @@ 
 //! Functions to run Mercurial command in cHg-aware command server.
 
 use bytes::Bytes;
-use futures::future::IntoFuture;
-use futures::{Async, Future, Poll};
 use std::io;
-use std::mem;
 use std::os::unix::io::AsRawFd;
 use tokio_hglib::codec::ChannelMessage;
-use tokio_hglib::protocol::MessageLoop;
-use tokio_hglib::{Client, Connection};
+use tokio_hglib::{Connection, Protocol};
 
-use crate::attachio::AttachIo;
+use crate::attachio;
 use crate::message::{self, CommandType};
 use crate::uihandler::SystemHandler;
 
-enum AsyncS<R, S> {
-    Ready(R),
-    NotReady(S),
-    PollAgain(S),
-}
-
-enum CommandState<C, H>
-where
-    C: Connection,
-    H: SystemHandler,
-{
-    Running(MessageLoop<C>, H),
-    SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
-    AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
-    WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
-    Finished,
-}
-
-type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
-
-/// Future resolves to `(exit_code, client)`.
-#[must_use = "futures do nothing unless polled"]
-pub struct ChgRunCommand<C, H>
-where
-    C: Connection,
-    H: SystemHandler,
-{
-    state: CommandState<C, H>,
-}
-
-impl<C, H> ChgRunCommand<C, H>
-where
-    C: Connection + AsRawFd,
-    H: SystemHandler,
-{
-    pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
-        let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
-        ChgRunCommand {
-            state: CommandState::Running(msg_loop, handler),
-        }
-    }
-}
-
-impl<C, H> Future for ChgRunCommand<C, H>
-where
-    C: Connection + AsRawFd,
-    H: SystemHandler,
-{
-    type Item = (Client<C>, H, i32);
-    type Error = io::Error;
-
-    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
-        loop {
-            let state = mem::replace(&mut self.state, CommandState::Finished);
-            match state.poll()? {
-                AsyncS::Ready((client, handler, code)) => {
-                    return Ok(Async::Ready((client, handler, code)));
-                }
-                AsyncS::NotReady(newstate) => {
-                    self.state = newstate;
-                    return Ok(Async::NotReady);
-                }
-                AsyncS::PollAgain(newstate) => {
-                    self.state = newstate;
-                }
-            }
-        }
-    }
-}
-
-impl<C, H> CommandState<C, H>
-where
-    C: Connection + AsRawFd,
-    H: SystemHandler,
-{
-    fn poll(self) -> CommandPoll<C, H> {
-        match self {
-            CommandState::Running(mut msg_loop, handler) => {
-                if let Async::Ready((client, msg)) = msg_loop.poll()? {
-                    process_message(client, handler, msg)
-                } else {
-                    Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
-                }
-            }
-            CommandState::SpawningPager(client, mut fut) => {
-                if let Async::Ready((handler, pin)) = fut.poll()? {
-                    let fut = AttachIo::with_client(client, io::stdin(), pin, None);
-                    Ok(AsyncS::PollAgain(CommandState::AttachingPager(
-                        fut, handler,
-                    )))
-                } else {
-                    Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
-                }
-            }
-            CommandState::AttachingPager(mut fut, handler) => {
-                if let Async::Ready(client) = fut.poll()? {
-                    let msg_loop = MessageLoop::start(client, b""); // terminator
-                    Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
-                } else {
-                    Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
-                }
-            }
-            CommandState::WaitingSystem(client, mut fut) => {
-                if let Async::Ready((handler, code)) = fut.poll()? {
-                    let data = message::pack_result_code(code);
-                    let msg_loop = MessageLoop::resume_with_data(client, data);
-                    Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
-                } else {
-                    Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
-                }
-            }
-            CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
-        }
-    }
-}
-
-fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
-where
-    C: Connection,
-    H: SystemHandler,
-{
-    {
-        match msg {
+/// Runs the given Mercurial command in cHg-aware command server, and
+/// fetches the result code.
+///
+/// This is a subset of tokio-hglib's `run_command()` with the additional
+/// SystemRequest support.
+pub async fn run_command(
+    proto: &mut Protocol<impl Connection + AsRawFd>,
+    handler: &mut impl SystemHandler,
+    packed_args: impl Into<Bytes>,
+) -> io::Result<i32> {
+    proto
+        .send_command_with_args("runcommand", packed_args)
+        .await?;
+    loop {
+        match proto.fetch_response().await? {
             ChannelMessage::Data(b'r', data) => {
-                let code = message::parse_result_code(data)?;
-                Ok(AsyncS::Ready((client, handler, code)))
+                return message::parse_result_code(data);
             }
             ChannelMessage::Data(..) => {
                 // just ignores data sent to optional channel
-                let msg_loop = MessageLoop::resume(client);
-                Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
             }
-            ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(
-                io::Error::new(io::ErrorKind::InvalidData, "unsupported request"),
-            ),
+            ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => {
+                return Err(io::Error::new(
+                    io::ErrorKind::InvalidData,
+                    "unsupported request",
+                ));
+            }
             ChannelMessage::SystemRequest(data) => {
                 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
                 match cmd_type {
                     CommandType::Pager => {
-                        let fut = handler.spawn_pager(cmd_spec).into_future();
-                        Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
+                        // server spins new command loop while pager request is
+                        // in progress, which can be terminated by "" command.
+                        let pin = handler.spawn_pager(&cmd_spec).await?;
+                        attachio::attach_io(proto, &io::stdin(), &pin, &pin).await?;
+                        proto.send_command("").await?; // terminator
                     }
                     CommandType::System => {
-                        let fut = handler.run_system(cmd_spec).into_future();
-                        Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
+                        let code = handler.run_system(&cmd_spec).await?;
+                        let data = message::pack_result_code(code);
+                        proto.send_data(data).await?;
                     }
                 }
             }
diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs
--- a/rust/chg/src/lib.rs
+++ b/rust/chg/src/lib.rs
@@ -8,7 +8,7 @@ 
 //pub mod locator;
 pub mod message;
 pub mod procutil;
-//mod runcommand;
+mod runcommand;
 mod uihandler;
 
 //pub use clientext::ChgClientExt;