Patchwork [3,of,7] commandserver: add new forking server implemented without using SocketServer

login
register
mail settings
Submitter Yuya Nishihara
Date July 14, 2016, 3:20 p.m.
Message ID <e56508ea57c23af68d05.1468509641@mimosa>
Download mbox | patch
Permalink /patch/15856/
State Accepted
Headers show

Comments

Yuya Nishihara - July 14, 2016, 3:20 p.m.
# HG changeset patch
# User Yuya Nishihara <yuya@tcha.org>
# Date 1463884998 -32400
#      Sun May 22 11:43:18 2016 +0900
# Node ID e56508ea57c23af68d05d941d344321b6ade8184
# Parent  e374627f118e8aca6243d263154aab9309abf1fc
commandserver: add new forking server implemented without using SocketServer

SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as:

 - race condition that leads to 100% CPU usage (Python 2.6)
   https://bugs.python.org/issue21491
 - can't wait for children belonging to different process groups (Python 2.6)
 - leaves at least one zombie process (Python 2.6, 2.7)
   https://bugs.python.org/issue11109

The first two are critical because we do setpgid(0, 0) in child process to
isolate terminal signals. The last one isn't, but ForkingMixIn seems to be
doing silly. So there are two choices:

 a) backport and maintain SocketServer until we can drop support for Python 2.x
 b) replace SocketServer by simpler one and eliminate glue codes

I chose (b) because it's great time for getting rid of utterly complicated
SocketServer stuff, and preparing for future move towards prefork service.

New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It
is monolithic but much simpler than SocketServer. unixservicehandler provides
customizing points for chg, and it will be shared with future prefork service.

Old unixservice class is still used by chgserver. It will be removed later.

Thanks to Jun Wu for investigating these issues.
Jun Wu - July 15, 2016, 3:27 p.m.
Excerpts from Yuya Nishihara's message of 2016-07-15 00:20:41 +0900:
> # HG changeset patch
> # User Yuya Nishihara <yuya@tcha.org>
> # Date 1463884998 -32400
> #      Sun May 22 11:43:18 2016 +0900
> # Node ID e56508ea57c23af68d05d941d344321b6ade8184
> # Parent  e374627f118e8aca6243d263154aab9309abf1fc
> commandserver: add new forking server implemented without using SocketServer
> 
> SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as:
> 
>  - race condition that leads to 100% CPU usage (Python 2.6)
>    https://bugs.python.org/issue21491 
>  - can't wait for children belonging to different process groups (Python 2.6)
>  - leaves at least one zombie process (Python 2.6, 2.7)
>    https://bugs.python.org/issue11109 
> 
> The first two are critical because we do setpgid(0, 0) in child process to
> isolate terminal signals. The last one isn't, but ForkingMixIn seems to be
> doing silly. So there are two choices:
> 
>  a) backport and maintain SocketServer until we can drop support for Python 2.x
>  b) replace SocketServer by simpler one and eliminate glue codes
> 
> I chose (b) because it's great time for getting rid of utterly complicated
> SocketServer stuff, and preparing for future move towards prefork service.
> 
> New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It
> is monolithic but much simpler than SocketServer. unixservicehandler provides
> customizing points for chg, and it will be shared with future prefork service.
> 
> Old unixservice class is still used by chgserver. It will be removed later.
> 
> Thanks to Jun Wu for investigating these issues.
> 
> diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py
> --- a/mercurial/commandserver.py
> +++ b/mercurial/commandserver.py
> @@ -11,6 +11,9 @@ import errno
>  import gc
>  import os
>  import random
> +import select
> +import signal
> +import socket
>  import struct
>  import sys
>  import traceback
> @@ -385,6 +388,41 @@ def _serverequest(ui, repo, conn, create
>              # trigger __del__ since ForkingMixIn uses os._exit
>              gc.collect()
>  
> +class unixservicehandler(object):
> +    """Set of pluggable operations for unix-mode services
> +
> +    Almost all methods except for createcmdserver() are called in the main
> +    process. You can't pass mutable resource back from createcmdserver().
> +    """
> +
> +    pollinterval = None
> +
> +    def __init__(self, ui):
> +        self.ui = ui

Do we really need the ui object? I'm a big fan of avoiding "self.ui = ui"
whenever possible.

For chgunixservicehandler, we can change its signature a bit and pass the
configs from the caller: __init__(self, idletimeout, skiphash)

> +    def bindsocket(self, sock, address):
> +        util.bindunixsocket(sock, address)
> +
> +    def unlinksocket(self, address):
> +        os.unlink(address)
> +
> +    def printbanner(self, address):
> +        self.ui.status(_('listening at %s\n') % address)
> +        self.ui.flush()  # avoid buffering of status message

To get rid of "self.ui", it could be a boolean, or a function returning a
string.

> +    def shouldexit(self):
> +        """True if server should shut down; checked per pollinterval"""
> +        return False
> +
> +    def newconnection(self):
> +        """Called when main process notices new connection"""
> +        pass
> +
> +    def createcmdserver(self, repo, conn, fin, fout):

And add a ui parameter here so we can remove "self.ui = ui". As "repo" is
here already, it will feel more consistent.

> +        """Create new command server instance; called in the process that
> +        serves for the current connection"""
> +        return server(self.ui, repo, fin, fout)
> +
>  class _requesthandler(socketserver.BaseRequestHandler):
>      def handle(self):
>          _serverequest(self.server.ui, self.server.repo, self.request,
> @@ -424,9 +462,96 @@ class unixservice(object):
>          finally:
>              self._cleanup()
>  
> +class unixforkingservice(unixservice):
> +    def __init__(self, ui, repo, opts, handler=None):
> +        super(unixforkingservice, self).__init__(ui, repo, opts)
> +        self._servicehandler = handler or unixservicehandler(ui)
> +        self._sock = None
> +        self._oldsigchldhandler = None
> +        self._workerpids = set()  # updated by signal handler; do not iterate
> +
> +    def init(self):
> +        self._sock = socket.socket(socket.AF_UNIX)
> +        self._servicehandler.bindsocket(self._sock, self.address)
> +        self._sock.listen(5)

Maybe make 5 a class constant, or as we have ui, read from some config
option. But that can be a later patch.

> +        o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
> +        self._oldsigchldhandler = o
> +        self._servicehandler.printbanner(self.address)
> +
> +    def _cleanup(self):
> +        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
> +        self._sock.close()
> +        self._servicehandler.unlinksocket(self.address)
> +        # don't kill child processes as they have active clients, just wait
> +        self._reapworkers(0)
> +
> +    def run(self):
> +        try:
> +            self._mainloop()
> +        finally:
> +            self._cleanup()
> +
> +    def _mainloop(self):
> +        h = self._servicehandler
> +        while not h.shouldexit():
> +            try:
> +                ready = select.select([self._sock], [], [], h.pollinterval)[0]
> +                if not ready:
> +                    continue
> +                conn, _addr = self._sock.accept()
> +            except (select.error, socket.error) as inst:
> +                if inst.args[0] == errno.EINTR:
> +                    continue
> +                raise
> +
> +            pid = os.fork()
> +            if pid:
> +                try:
> +                    self.ui.debug('forked worker process (pid=%d)\n' % pid)
> +                    self._workerpids.add(pid)
> +                    h.newconnection()
> +                finally:
> +                    conn.close()  # release handle in parent process
> +            else:
> +                try:
> +                    self._serveworker(conn)
> +                    conn.close()
> +                    os._exit(0)
> +                except:  # never return, hence no re-raises
> +                    try:
> +                        self.ui.traceback(force=True)
> +                    finally:
> +                        os._exit(255)
> +
> +    def _sigchldhandler(self, signal, frame):
> +        self._reapworkers(os.WNOHANG)
> +
> +    def _reapworkers(self, options):
> +        while self._workerpids:
> +            try:
> +                pid, _status = os.waitpid(-1, options)
> +            except OSError as inst:
> +                if inst.errno == errno.EINTR:
> +                    continue
> +                if inst.errno != errno.ECHILD:
> +                    raise
> +                # no child processes at all (reaped by other waitpid()?)
> +                self._workerpids.clear()
> +                return
> +            if pid == 0:
> +                # no waitable child processes
> +                return
> +            self.ui.debug('worker process exited (pid=%d)\n' % pid)
> +            self._workerpids.discard(pid)
> +
> +    def _serveworker(self, conn):

I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb
and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it
to mainloop (makes sense to me since it's simple and the fork logic is
already there, and I think we don't care about subclass now).

> +        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
> +        h = self._servicehandler
> +        _serverequest(self.ui, self.repo, conn, h.createcmdserver)
> +
>  _servicemap = {
>      'pipe': pipeservice,
> -    'unix': unixservice,
> +    'unix': unixforkingservice,
>      }
>  
>  def createservice(ui, repo, opts):
Yuya Nishihara - July 15, 2016, 4:02 p.m.
On Fri, 15 Jul 2016 16:27:56 +0100, Jun Wu wrote:
> Excerpts from Yuya Nishihara's message of 2016-07-15 00:20:41 +0900:
> > # HG changeset patch
> > # User Yuya Nishihara <yuya@tcha.org>
> > # Date 1463884998 -32400
> > #      Sun May 22 11:43:18 2016 +0900
> > # Node ID e56508ea57c23af68d05d941d344321b6ade8184
> > # Parent  e374627f118e8aca6243d263154aab9309abf1fc
> > commandserver: add new forking server implemented without using SocketServer

> > +class unixservicehandler(object):
> > +    """Set of pluggable operations for unix-mode services
> > +
> > +    Almost all methods except for createcmdserver() are called in the main
> > +    process. You can't pass mutable resource back from createcmdserver().
> > +    """
> > +
> > +    pollinterval = None
> > +
> > +    def __init__(self, ui):
> > +        self.ui = ui  
> 
> Do we really need the ui object? I'm a big fan of avoiding "self.ui = ui"
> whenever possible.

Good question. I made it keep ui because future patches will replace _log(),
which is noop now, by ui.debug().

> > +    def createcmdserver(self, repo, conn, fin, fout):  
> 
> And add a ui parameter here so we can remove "self.ui = ui". As "repo" is
> here already, it will feel more consistent.

Yep, I initially designed it to accept (ui, repo, ...), but dropped ui because
of the reason above.

> > +class unixforkingservice(unixservice):
> > +    def __init__(self, ui, repo, opts, handler=None):
> > +        super(unixforkingservice, self).__init__(ui, repo, opts)
> > +        self._servicehandler = handler or unixservicehandler(ui)
> > +        self._sock = None
> > +        self._oldsigchldhandler = None
> > +        self._workerpids = set()  # updated by signal handler; do not iterate
> > +
> > +    def init(self):
> > +        self._sock = socket.socket(socket.AF_UNIX)
> > +        self._servicehandler.bindsocket(self._sock, self.address)
> > +        self._sock.listen(5)  
> 
> Maybe make 5 a class constant, or as we have ui, read from some config
> option. But that can be a later patch.

A class constant seems good. I'll do that if I need to revise this patch.

> > +    def _serveworker(self, conn):  
> 
> I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb
> and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it
> to mainloop (makes sense to me since it's simple and the fork logic is
> already there, and I think we don't care about subclass now).

I'm going to move the initialization of the worker process to this function,
which is why I avoid calling it _serverequest(). If we add a prefork server,
setpgid() and random.seed() shouldn't be executed per request, so they will
be moved from _serverequest() to unixforkingservice._serveworker().
Jun Wu - July 15, 2016, 4:29 p.m.
Excerpts from Yuya Nishihara's message of 2016-07-16 01:02:51 +0900:
> On Fri, 15 Jul 2016 16:27:56 +0100, Jun Wu wrote:
> > Excerpts from Yuya Nishihara's message of 2016-07-15 00:20:41 +0900:
> > > # HG changeset patch
> > > # User Yuya Nishihara <yuya@tcha.org>
> > > # Date 1463884998 -32400
> > > #      Sun May 22 11:43:18 2016 +0900
> > > # Node ID e56508ea57c23af68d05d941d344321b6ade8184
> > > # Parent  e374627f118e8aca6243d263154aab9309abf1fc
> > > commandserver: add new forking server implemented without using SocketServer
> 
> > > +class unixservicehandler(object):
> > > +    """Set of pluggable operations for unix-mode services
> > > +
> > > +    Almost all methods except for createcmdserver() are called in the main
> > > +    process. You can't pass mutable resource back from createcmdserver().
> > > +    """
> > > +
> > > +    pollinterval = None
> > > +
> > > +    def __init__(self, ui):
> > > +        self.ui = ui  
> > 
> > Do we really need the ui object? I'm a big fan of avoiding "self.ui = ui"
> > whenever possible.
> 
> Good question. I made it keep ui because future patches will replace _log(),
> which is noop now, by ui.debug().

That makes sense. It seems hard to change the architecture. Although if we
really want to, we can pass the file object.

> > > +    def createcmdserver(self, repo, conn, fin, fout):  
> > 
> > And add a ui parameter here so we can remove "self.ui = ui". As "repo" is
> > here already, it will feel more consistent.
> 
> Yep, I initially designed it to accept (ui, repo, ...), but dropped ui because
> of the reason above.
> 
> > > +class unixforkingservice(unixservice):
> > > +    def __init__(self, ui, repo, opts, handler=None):
> > > +        super(unixforkingservice, self).__init__(ui, repo, opts)
> > > +        self._servicehandler = handler or unixservicehandler(ui)
> > > +        self._sock = None
> > > +        self._oldsigchldhandler = None
> > > +        self._workerpids = set()  # updated by signal handler; do not iterate
> > > +
> > > +    def init(self):
> > > +        self._sock = socket.socket(socket.AF_UNIX)
> > > +        self._servicehandler.bindsocket(self._sock, self.address)
> > > +        self._sock.listen(5)  
> > 
> > Maybe make 5 a class constant, or as we have ui, read from some config
> > option. But that can be a later patch.
> 
> A class constant seems good. I'll do that if I need to revise this patch.
> 
> > > +    def _serveworker(self, conn):  
> > 
> > I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb
> > and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it
> > to mainloop (makes sense to me since it's simple and the fork logic is
> > already there, and I think we don't care about subclass now).
> 
> I'm going to move the initialization of the worker process to this function,
> which is why I avoid calling it _serverequest(). If we add a prefork server,
> setpgid() and random.seed() shouldn't be executed per request, so they will
> be moved from _serverequest() to unixforkingservice._serveworker().

Then how about "_runworker"? Anyway I don't feel strong as we don't need to
care about subclassing now.
Yuya Nishihara - July 16, 2016, 1:04 a.m.
On Fri, 15 Jul 2016 17:29:50 +0100, Jun Wu wrote:
> Excerpts from Yuya Nishihara's message of 2016-07-16 01:02:51 +0900:
> > > > +    def _serveworker(self, conn):  
> > > 
> > > I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb
> > > and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it
> > > to mainloop (makes sense to me since it's simple and the fork logic is
> > > already there, and I think we don't care about subclass now).
> > 
> > I'm going to move the initialization of the worker process to this function,
> > which is why I avoid calling it _serverequest(). If we add a prefork server,
> > setpgid() and random.seed() shouldn't be executed per request, so they will
> > be moved from _serverequest() to unixforkingservice._serveworker().
> 
> Then how about "_runworker"? Anyway I don't feel strong as we don't need to
> care about subclassing now.

Sounds better. I'll send a follow-up patch.

Patch

diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py
--- a/mercurial/commandserver.py
+++ b/mercurial/commandserver.py
@@ -11,6 +11,9 @@  import errno
 import gc
 import os
 import random
+import select
+import signal
+import socket
 import struct
 import sys
 import traceback
@@ -385,6 +388,41 @@  def _serverequest(ui, repo, conn, create
             # trigger __del__ since ForkingMixIn uses os._exit
             gc.collect()
 
+class unixservicehandler(object):
+    """Set of pluggable operations for unix-mode services
+
+    Almost all methods except for createcmdserver() are called in the main
+    process. You can't pass mutable resource back from createcmdserver().
+    """
+
+    pollinterval = None
+
+    def __init__(self, ui):
+        self.ui = ui
+
+    def bindsocket(self, sock, address):
+        util.bindunixsocket(sock, address)
+
+    def unlinksocket(self, address):
+        os.unlink(address)
+
+    def printbanner(self, address):
+        self.ui.status(_('listening at %s\n') % address)
+        self.ui.flush()  # avoid buffering of status message
+
+    def shouldexit(self):
+        """True if server should shut down; checked per pollinterval"""
+        return False
+
+    def newconnection(self):
+        """Called when main process notices new connection"""
+        pass
+
+    def createcmdserver(self, repo, conn, fin, fout):
+        """Create new command server instance; called in the process that
+        serves for the current connection"""
+        return server(self.ui, repo, fin, fout)
+
 class _requesthandler(socketserver.BaseRequestHandler):
     def handle(self):
         _serverequest(self.server.ui, self.server.repo, self.request,
@@ -424,9 +462,96 @@  class unixservice(object):
         finally:
             self._cleanup()
 
+class unixforkingservice(unixservice):
+    def __init__(self, ui, repo, opts, handler=None):
+        super(unixforkingservice, self).__init__(ui, repo, opts)
+        self._servicehandler = handler or unixservicehandler(ui)
+        self._sock = None
+        self._oldsigchldhandler = None
+        self._workerpids = set()  # updated by signal handler; do not iterate
+
+    def init(self):
+        self._sock = socket.socket(socket.AF_UNIX)
+        self._servicehandler.bindsocket(self._sock, self.address)
+        self._sock.listen(5)
+        o = signal.signal(signal.SIGCHLD, self._sigchldhandler)
+        self._oldsigchldhandler = o
+        self._servicehandler.printbanner(self.address)
+
+    def _cleanup(self):
+        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+        self._sock.close()
+        self._servicehandler.unlinksocket(self.address)
+        # don't kill child processes as they have active clients, just wait
+        self._reapworkers(0)
+
+    def run(self):
+        try:
+            self._mainloop()
+        finally:
+            self._cleanup()
+
+    def _mainloop(self):
+        h = self._servicehandler
+        while not h.shouldexit():
+            try:
+                ready = select.select([self._sock], [], [], h.pollinterval)[0]
+                if not ready:
+                    continue
+                conn, _addr = self._sock.accept()
+            except (select.error, socket.error) as inst:
+                if inst.args[0] == errno.EINTR:
+                    continue
+                raise
+
+            pid = os.fork()
+            if pid:
+                try:
+                    self.ui.debug('forked worker process (pid=%d)\n' % pid)
+                    self._workerpids.add(pid)
+                    h.newconnection()
+                finally:
+                    conn.close()  # release handle in parent process
+            else:
+                try:
+                    self._serveworker(conn)
+                    conn.close()
+                    os._exit(0)
+                except:  # never return, hence no re-raises
+                    try:
+                        self.ui.traceback(force=True)
+                    finally:
+                        os._exit(255)
+
+    def _sigchldhandler(self, signal, frame):
+        self._reapworkers(os.WNOHANG)
+
+    def _reapworkers(self, options):
+        while self._workerpids:
+            try:
+                pid, _status = os.waitpid(-1, options)
+            except OSError as inst:
+                if inst.errno == errno.EINTR:
+                    continue
+                if inst.errno != errno.ECHILD:
+                    raise
+                # no child processes at all (reaped by other waitpid()?)
+                self._workerpids.clear()
+                return
+            if pid == 0:
+                # no waitable child processes
+                return
+            self.ui.debug('worker process exited (pid=%d)\n' % pid)
+            self._workerpids.discard(pid)
+
+    def _serveworker(self, conn):
+        signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
+        h = self._servicehandler
+        _serverequest(self.ui, self.repo, conn, h.createcmdserver)
+
 _servicemap = {
     'pipe': pipeservice,
-    'unix': unixservice,
+    'unix': unixforkingservice,
     }
 
 def createservice(ui, repo, opts):