Patchwork [2,of,4,RFC] chgcache: implement simple IPC mechanism

login
register
mail settings
Submitter Jun Wu
Date Feb. 9, 2017, 1:41 a.m.
Message ID <8410c4a670ffff3bed4b.1486604462@x1c>
Download mbox | patch
Permalink /patch/18357/
State Changes Requested
Delegated to: Yuya Nishihara
Headers show

Comments

Jun Wu - Feb. 9, 2017, 1:41 a.m.
# HG changeset patch
# User Jun Wu <quark@fb.com>
# Date 1486601798 28800
#      Wed Feb 08 16:56:38 2017 -0800
# Node ID 8410c4a670ffff3bed4b459cf8d62bd32fdcb1e7
# Parent  138f7ba58a70de9610713b8bd55d1ba0ac468fa6
# Available At https://bitbucket.org/quark-zju/hg-draft
#              hg pull https://bitbucket.org/quark-zju/hg-draft -r 8410c4a670ff
chgcache: implement simple IPC mechanism

We need an inter-process communication mechanism so forked chg workers could
tell the master chg server where the repos are. The IPC mechanism:

  - Could be one-way - workers send messages to the master. Currently
    workers won't read from the master.
  - Should be non-blocking on write - workers should not wait for the master
    to read content they send.
  - Could lose messages - messages are just some "suggestions" about what to
    preload, which could be discarded safely.
  - Better to be blocking on read - if reading is blocking, the master
    server could learn what to preload immediately, without polling
    periodically.

This patch adds a class using datagram sockets to do the IPC. The choice is
mainly because SOCK_DGRAM prevents incomplete messages from being sent, and
we don't need to deal with message boundaries.
Yuya Nishihara - Feb. 13, 2017, 2 p.m.
On Wed, 8 Feb 2017 17:41:02 -0800, Jun Wu wrote:
> # HG changeset patch
> # User Jun Wu <quark@fb.com>
> # Date 1486601798 28800
> #      Wed Feb 08 16:56:38 2017 -0800
> # Node ID 8410c4a670ffff3bed4b459cf8d62bd32fdcb1e7
> # Parent  138f7ba58a70de9610713b8bd55d1ba0ac468fa6
> # Available At https://bitbucket.org/quark-zju/hg-draft
> #              hg pull https://bitbucket.org/quark-zju/hg-draft -r 8410c4a670ff
> chgcache: implement simple IPC mechanism
> 
> We need an inter-process communication mechanism so forked chg workers could
> tell the master chg server where the repos are. The IPC mechanism:
> 
>   - Could be one-way - workers send messages to the master. Currently
>     workers won't read from the master.
>   - Should be non-blocking on write - workers should not wait for the master
>     to read content they send.
>   - Could lose messages - messages are just some "suggestions" about what to
>     preload, which could be discarded safely.
>   - Better to be blocking on read - if reading is blocking, the master
>     server could learn what to preload immediately, without polling
>     periodically.
> 
> This patch adds a class using datagram sockets to do the IPC. The choice is
> mainly because SOCK_DGRAM prevents incomplete messages from being sent, and
> we don't need to deal with message boundaries.
> 
> diff --git a/hgext/chgcache.py b/hgext/chgcache.py
> --- a/hgext/chgcache.py
> +++ b/hgext/chgcache.py
> @@ -10,2 +10,56 @@ With this extension installed, Mercurial
>  repo objects to further reduce start-up time.
>  """
> +from __future__ import absolute_import
> +
> +import socket
> +
> +class socketipc(object):
> +    """A simple IPC mechanism that sets up an unreliable communication channel
> +    between the master server and (multiple) forked worker processes. The
> +    forked workers do non-blocking writes, while the master server does
> +    blocking reads.
> +
> +    To use the object, create it in the master server, read from a thread, and
> +    write from forked processes:
> +
> +        # pid=1000, master, main thread
> +        ipc = socketipc()
> +
> +        # pid=1000, master, a background thread
> +        while True:
> +            msg = ipc.recv() # blocking
> +            ....
> +
> +        # pid=1001, worker
> +        ipc.send('foo') # non-blocking, silently ignore errors
> +
> +        # pid=1002, worker
> +        ipc.send('bar') # non-blocking, silently ignore errors
> +    """
> +
> +    suffix = b'\0'  # to detect truncated recv()s
> +    maxsize = 1 << 16  # estimated max packet size for recv()
> +
> +    def __init__(self):
> +        self._in, self._out = socket.socketpair(socket.AF_UNIX,
> +                                                socket.SOCK_DGRAM)
> +        self._out.setblocking(False)

Why not os.pipe()?

We share the same (dup-ed) file descriptors. In this case, the write end can
be used by many forked processes, but IIRC the read end can't. Only one
reader can read out a message. So I think there's no reason to set up a
full duplex IPC channel.
Jun Wu - Feb. 13, 2017, 5:46 p.m.
Excerpts from Yuya Nishihara's message of 2017-02-13 23:00:25 +0900:
> Why not os.pipe()?
>
> We share the same (dup-ed) file descriptors. In this case, the write end can
> be used by many forked processes, but IIRC the read end can't. Only one
> reader can read out a message. So I think there's no reason to set up a
> full duplex IPC channel.

Duplex IRC is not needed. But I do want to use DGRAM.

DGRAM makes it impossible to send incomplete messages. I think that makes
the code simpler and maybe more efficient.
Yuya Nishihara - Feb. 14, 2017, 1:49 p.m.
On Mon, 13 Feb 2017 09:46:21 -0800, Jun Wu wrote:
> Excerpts from Yuya Nishihara's message of 2017-02-13 23:00:25 +0900:
> > Why not os.pipe()?
> >
> > We share the same (dup-ed) file descriptors. In this case, the write end can
> > be used by many forked processes, but IIRC the read end can't. Only one
> > reader can read out a message. So I think there's no reason to set up a
> > full duplex IPC channel.
> 
> Duplex IRC is not needed. But I do want to use DGRAM.
> 
> DGRAM makes it impossible to send incomplete messages. I think that makes
> the code simpler and maybe more efficient.

Ok.

Perhaps you won't need the suffix as datagram socket is message oriented?
Jun Wu - Feb. 14, 2017, 7:05 p.m.
Excerpts from Yuya Nishihara's message of 2017-02-14 22:49:34 +0900:
> On Mon, 13 Feb 2017 09:46:21 -0800, Jun Wu wrote:
> > Excerpts from Yuya Nishihara's message of 2017-02-13 23:00:25 +0900:
> > > Why not os.pipe()?
> > >
> > > We share the same (dup-ed) file descriptors. In this case, the write end can
> > > be used by many forked processes, but IIRC the read end can't. Only one
> > > reader can read out a message. So I think there's no reason to set up a
> > > full duplex IPC channel.
> > 
> > Duplex IRC is not needed. But I do want to use DGRAM.
> > 
> > DGRAM makes it impossible to send incomplete messages. I think that makes
> > the code simpler and maybe more efficient.
> 
> Ok.
> 
> Perhaps you won't need the suffix as datagram socket is message oriented?

Right. It's not needed if we can figure out the message length first.

That could be done via "fcntl.ioctl(fd, termios.FIONREAD, buf)", which won't
work on Windows. I think that's fine since we have so many non-Windows code
already.
Yuya Nishihara - Feb. 15, 2017, 1 p.m.
On Tue, 14 Feb 2017 11:05:00 -0800, Jun Wu wrote:
> Excerpts from Yuya Nishihara's message of 2017-02-14 22:49:34 +0900:
> > On Mon, 13 Feb 2017 09:46:21 -0800, Jun Wu wrote:
> > > Excerpts from Yuya Nishihara's message of 2017-02-13 23:00:25 +0900:
> > > > Why not os.pipe()?
> > > >
> > > > We share the same (dup-ed) file descriptors. In this case, the write end can
> > > > be used by many forked processes, but IIRC the read end can't. Only one
> > > > reader can read out a message. So I think there's no reason to set up a
> > > > full duplex IPC channel.
> > > 
> > > Duplex IRC is not needed. But I do want to use DGRAM.
> > > 
> > > DGRAM makes it impossible to send incomplete messages. I think that makes
> > > the code simpler and maybe more efficient.
> > 
> > Ok.
> > 
> > Perhaps you won't need the suffix as datagram socket is message oriented?
> 
> Right. It's not needed if we can figure out the message length first.

I don't know the datagram unix socket, but if it acts the same as UDP socket,
recv() will read exactly one message. Contiguous messages won't be streamed.

> That could be done via "fcntl.ioctl(fd, termios.FIONREAD, buf)", which won't
> work on Windows. I think that's fine since we have so many non-Windows code
> already.

Sure. chg will never run on Windows.

Patch

diff --git a/hgext/chgcache.py b/hgext/chgcache.py
--- a/hgext/chgcache.py
+++ b/hgext/chgcache.py
@@ -10,2 +10,56 @@  With this extension installed, Mercurial
 repo objects to further reduce start-up time.
 """
+from __future__ import absolute_import
+
+import socket
+
+class socketipc(object):
+    """A simple IPC mechanism that sets up an unreliable communication channel
+    between the master server and (multiple) forked worker processes. The
+    forked workers do non-blocking writes, while the master server does
+    blocking reads.
+
+    To use the object, create it in the master server, read from a thread, and
+    write from forked processes:
+
+        # pid=1000, master, main thread
+        ipc = socketipc()
+
+        # pid=1000, master, a background thread
+        while True:
+            msg = ipc.recv() # blocking
+            ....
+
+        # pid=1001, worker
+        ipc.send('foo') # non-blocking, silently ignore errors
+
+        # pid=1002, worker
+        ipc.send('bar') # non-blocking, silently ignore errors
+    """
+
+    suffix = b'\0'  # to detect truncated recv()s
+    maxsize = 1 << 16  # estimated max packet size for recv()
+
+    def __init__(self):
+        self._in, self._out = socket.socketpair(socket.AF_UNIX,
+                                                socket.SOCK_DGRAM)
+        self._out.setblocking(False)
+
+    def send(self, msg):
+        """send msg without blocking. fail silently on errors, ex. msg is too
+        long, or the queue is full. msg should not contain '\0'.
+        """
+        try:
+            return self._out.send(msg + self.suffix)
+        except socket.error:
+            pass
+
+    def recv(self):
+        """receive a complete msg. blocking."""
+        while True:
+            try:
+                msg = self._in.recv(self.maxsize)
+                if msg.endswith(self.suffix):
+                    return msg[:-1]
+            except socket.error:
+                pass