Submitter | Yuya Nishihara |
---|---|
Date | July 14, 2016, 3:20 p.m. |
Message ID | <e56508ea57c23af68d05.1468509641@mimosa> |
Download | mbox | patch |
Permalink | /patch/15856/ |
State | Accepted |
Headers | show |
Comments
Excerpts from Yuya Nishihara's message of 2016-07-15 00:20:41 +0900: > # HG changeset patch > # User Yuya Nishihara <yuya@tcha.org> > # Date 1463884998 -32400 > # Sun May 22 11:43:18 2016 +0900 > # Node ID e56508ea57c23af68d05d941d344321b6ade8184 > # Parent e374627f118e8aca6243d263154aab9309abf1fc > commandserver: add new forking server implemented without using SocketServer > > SocketServer.ForkingMixIn of Python 2.x has a couple of issues, such as: > > - race condition that leads to 100% CPU usage (Python 2.6) > https://bugs.python.org/issue21491 > - can't wait for children belonging to different process groups (Python 2.6) > - leaves at least one zombie process (Python 2.6, 2.7) > https://bugs.python.org/issue11109 > > The first two are critical because we do setpgid(0, 0) in child process to > isolate terminal signals. The last one isn't, but ForkingMixIn seems to be > doing silly. So there are two choices: > > a) backport and maintain SocketServer until we can drop support for Python 2.x > b) replace SocketServer by simpler one and eliminate glue codes > > I chose (b) because it's great time for getting rid of utterly complicated > SocketServer stuff, and preparing for future move towards prefork service. > > New unixforkingservice is implemented loosely based on chg 531f8ef64be6. It > is monolithic but much simpler than SocketServer. unixservicehandler provides > customizing points for chg, and it will be shared with future prefork service. > > Old unixservice class is still used by chgserver. It will be removed later. > > Thanks to Jun Wu for investigating these issues. > > diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py > --- a/mercurial/commandserver.py > +++ b/mercurial/commandserver.py > @@ -11,6 +11,9 @@ import errno > import gc > import os > import random > +import select > +import signal > +import socket > import struct > import sys > import traceback > @@ -385,6 +388,41 @@ def _serverequest(ui, repo, conn, create > # trigger __del__ since ForkingMixIn uses os._exit > gc.collect() > > +class unixservicehandler(object): > + """Set of pluggable operations for unix-mode services > + > + Almost all methods except for createcmdserver() are called in the main > + process. You can't pass mutable resource back from createcmdserver(). > + """ > + > + pollinterval = None > + > + def __init__(self, ui): > + self.ui = ui Do we really need the ui object? I'm a big fan of avoiding "self.ui = ui" whenever possible. For chgunixservicehandler, we can change its signature a bit and pass the configs from the caller: __init__(self, idletimeout, skiphash) > + def bindsocket(self, sock, address): > + util.bindunixsocket(sock, address) > + > + def unlinksocket(self, address): > + os.unlink(address) > + > + def printbanner(self, address): > + self.ui.status(_('listening at %s\n') % address) > + self.ui.flush() # avoid buffering of status message To get rid of "self.ui", it could be a boolean, or a function returning a string. > + def shouldexit(self): > + """True if server should shut down; checked per pollinterval""" > + return False > + > + def newconnection(self): > + """Called when main process notices new connection""" > + pass > + > + def createcmdserver(self, repo, conn, fin, fout): And add a ui parameter here so we can remove "self.ui = ui". As "repo" is here already, it will feel more consistent. > + """Create new command server instance; called in the process that > + serves for the current connection""" > + return server(self.ui, repo, fin, fout) > + > class _requesthandler(socketserver.BaseRequestHandler): > def handle(self): > _serverequest(self.server.ui, self.server.repo, self.request, > @@ -424,9 +462,96 @@ class unixservice(object): > finally: > self._cleanup() > > +class unixforkingservice(unixservice): > + def __init__(self, ui, repo, opts, handler=None): > + super(unixforkingservice, self).__init__(ui, repo, opts) > + self._servicehandler = handler or unixservicehandler(ui) > + self._sock = None > + self._oldsigchldhandler = None > + self._workerpids = set() # updated by signal handler; do not iterate > + > + def init(self): > + self._sock = socket.socket(socket.AF_UNIX) > + self._servicehandler.bindsocket(self._sock, self.address) > + self._sock.listen(5) Maybe make 5 a class constant, or as we have ui, read from some config option. But that can be a later patch. > + o = signal.signal(signal.SIGCHLD, self._sigchldhandler) > + self._oldsigchldhandler = o > + self._servicehandler.printbanner(self.address) > + > + def _cleanup(self): > + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) > + self._sock.close() > + self._servicehandler.unlinksocket(self.address) > + # don't kill child processes as they have active clients, just wait > + self._reapworkers(0) > + > + def run(self): > + try: > + self._mainloop() > + finally: > + self._cleanup() > + > + def _mainloop(self): > + h = self._servicehandler > + while not h.shouldexit(): > + try: > + ready = select.select([self._sock], [], [], h.pollinterval)[0] > + if not ready: > + continue > + conn, _addr = self._sock.accept() > + except (select.error, socket.error) as inst: > + if inst.args[0] == errno.EINTR: > + continue > + raise > + > + pid = os.fork() > + if pid: > + try: > + self.ui.debug('forked worker process (pid=%d)\n' % pid) > + self._workerpids.add(pid) > + h.newconnection() > + finally: > + conn.close() # release handle in parent process > + else: > + try: > + self._serveworker(conn) > + conn.close() > + os._exit(0) > + except: # never return, hence no re-raises > + try: > + self.ui.traceback(force=True) > + finally: > + os._exit(255) > + > + def _sigchldhandler(self, signal, frame): > + self._reapworkers(os.WNOHANG) > + > + def _reapworkers(self, options): > + while self._workerpids: > + try: > + pid, _status = os.waitpid(-1, options) > + except OSError as inst: > + if inst.errno == errno.EINTR: > + continue > + if inst.errno != errno.ECHILD: > + raise > + # no child processes at all (reaped by other waitpid()?) > + self._workerpids.clear() > + return > + if pid == 0: > + # no waitable child processes > + return > + self.ui.debug('worker process exited (pid=%d)\n' % pid) > + self._workerpids.discard(pid) > + > + def _serveworker(self, conn): I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it to mainloop (makes sense to me since it's simple and the fork logic is already there, and I think we don't care about subclass now). > + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) > + h = self._servicehandler > + _serverequest(self.ui, self.repo, conn, h.createcmdserver) > + > _servicemap = { > 'pipe': pipeservice, > - 'unix': unixservice, > + 'unix': unixforkingservice, > } > > def createservice(ui, repo, opts):
On Fri, 15 Jul 2016 16:27:56 +0100, Jun Wu wrote: > Excerpts from Yuya Nishihara's message of 2016-07-15 00:20:41 +0900: > > # HG changeset patch > > # User Yuya Nishihara <yuya@tcha.org> > > # Date 1463884998 -32400 > > # Sun May 22 11:43:18 2016 +0900 > > # Node ID e56508ea57c23af68d05d941d344321b6ade8184 > > # Parent e374627f118e8aca6243d263154aab9309abf1fc > > commandserver: add new forking server implemented without using SocketServer > > +class unixservicehandler(object): > > + """Set of pluggable operations for unix-mode services > > + > > + Almost all methods except for createcmdserver() are called in the main > > + process. You can't pass mutable resource back from createcmdserver(). > > + """ > > + > > + pollinterval = None > > + > > + def __init__(self, ui): > > + self.ui = ui > > Do we really need the ui object? I'm a big fan of avoiding "self.ui = ui" > whenever possible. Good question. I made it keep ui because future patches will replace _log(), which is noop now, by ui.debug(). > > + def createcmdserver(self, repo, conn, fin, fout): > > And add a ui parameter here so we can remove "self.ui = ui". As "repo" is > here already, it will feel more consistent. Yep, I initially designed it to accept (ui, repo, ...), but dropped ui because of the reason above. > > +class unixforkingservice(unixservice): > > + def __init__(self, ui, repo, opts, handler=None): > > + super(unixforkingservice, self).__init__(ui, repo, opts) > > + self._servicehandler = handler or unixservicehandler(ui) > > + self._sock = None > > + self._oldsigchldhandler = None > > + self._workerpids = set() # updated by signal handler; do not iterate > > + > > + def init(self): > > + self._sock = socket.socket(socket.AF_UNIX) > > + self._servicehandler.bindsocket(self._sock, self.address) > > + self._sock.listen(5) > > Maybe make 5 a class constant, or as we have ui, read from some config > option. But that can be a later patch. A class constant seems good. I'll do that if I need to revise this patch. > > + def _serveworker(self, conn): > > I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb > and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it > to mainloop (makes sense to me since it's simple and the fork logic is > already there, and I think we don't care about subclass now). I'm going to move the initialization of the worker process to this function, which is why I avoid calling it _serverequest(). If we add a prefork server, setpgid() and random.seed() shouldn't be executed per request, so they will be moved from _serverequest() to unixforkingservice._serveworker().
Excerpts from Yuya Nishihara's message of 2016-07-16 01:02:51 +0900: > On Fri, 15 Jul 2016 16:27:56 +0100, Jun Wu wrote: > > Excerpts from Yuya Nishihara's message of 2016-07-15 00:20:41 +0900: > > > # HG changeset patch > > > # User Yuya Nishihara <yuya@tcha.org> > > > # Date 1463884998 -32400 > > > # Sun May 22 11:43:18 2016 +0900 > > > # Node ID e56508ea57c23af68d05d941d344321b6ade8184 > > > # Parent e374627f118e8aca6243d263154aab9309abf1fc > > > commandserver: add new forking server implemented without using SocketServer > > > > +class unixservicehandler(object): > > > + """Set of pluggable operations for unix-mode services > > > + > > > + Almost all methods except for createcmdserver() are called in the main > > > + process. You can't pass mutable resource back from createcmdserver(). > > > + """ > > > + > > > + pollinterval = None > > > + > > > + def __init__(self, ui): > > > + self.ui = ui > > > > Do we really need the ui object? I'm a big fan of avoiding "self.ui = ui" > > whenever possible. > > Good question. I made it keep ui because future patches will replace _log(), > which is noop now, by ui.debug(). That makes sense. It seems hard to change the architecture. Although if we really want to, we can pass the file object. > > > + def createcmdserver(self, repo, conn, fin, fout): > > > > And add a ui parameter here so we can remove "self.ui = ui". As "repo" is > > here already, it will feel more consistent. > > Yep, I initially designed it to accept (ui, repo, ...), but dropped ui because > of the reason above. > > > > +class unixforkingservice(unixservice): > > > + def __init__(self, ui, repo, opts, handler=None): > > > + super(unixforkingservice, self).__init__(ui, repo, opts) > > > + self._servicehandler = handler or unixservicehandler(ui) > > > + self._sock = None > > > + self._oldsigchldhandler = None > > > + self._workerpids = set() # updated by signal handler; do not iterate > > > + > > > + def init(self): > > > + self._sock = socket.socket(socket.AF_UNIX) > > > + self._servicehandler.bindsocket(self._sock, self.address) > > > + self._sock.listen(5) > > > > Maybe make 5 a class constant, or as we have ui, read from some config > > option. But that can be a later patch. > > A class constant seems good. I'll do that if I need to revise this patch. > > > > + def _serveworker(self, conn): > > > > I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb > > and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it > > to mainloop (makes sense to me since it's simple and the fork logic is > > already there, and I think we don't care about subclass now). > > I'm going to move the initialization of the worker process to this function, > which is why I avoid calling it _serverequest(). If we add a prefork server, > setpgid() and random.seed() shouldn't be executed per request, so they will > be moved from _serverequest() to unixforkingservice._serveworker(). Then how about "_runworker"? Anyway I don't feel strong as we don't need to care about subclassing now.
On Fri, 15 Jul 2016 17:29:50 +0100, Jun Wu wrote: > Excerpts from Yuya Nishihara's message of 2016-07-16 01:02:51 +0900: > > > > + def _serveworker(self, conn): > > > > > > I find the name "_serveworker" a bit ambiguous. "serve" is usually a verb > > > and iiuc it's used as a noun here. Maybe just use "_serverequest" or move it > > > to mainloop (makes sense to me since it's simple and the fork logic is > > > already there, and I think we don't care about subclass now). > > > > I'm going to move the initialization of the worker process to this function, > > which is why I avoid calling it _serverequest(). If we add a prefork server, > > setpgid() and random.seed() shouldn't be executed per request, so they will > > be moved from _serverequest() to unixforkingservice._serveworker(). > > Then how about "_runworker"? Anyway I don't feel strong as we don't need to > care about subclassing now. Sounds better. I'll send a follow-up patch.
Patch
diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py --- a/mercurial/commandserver.py +++ b/mercurial/commandserver.py @@ -11,6 +11,9 @@ import errno import gc import os import random +import select +import signal +import socket import struct import sys import traceback @@ -385,6 +388,41 @@ def _serverequest(ui, repo, conn, create # trigger __del__ since ForkingMixIn uses os._exit gc.collect() +class unixservicehandler(object): + """Set of pluggable operations for unix-mode services + + Almost all methods except for createcmdserver() are called in the main + process. You can't pass mutable resource back from createcmdserver(). + """ + + pollinterval = None + + def __init__(self, ui): + self.ui = ui + + def bindsocket(self, sock, address): + util.bindunixsocket(sock, address) + + def unlinksocket(self, address): + os.unlink(address) + + def printbanner(self, address): + self.ui.status(_('listening at %s\n') % address) + self.ui.flush() # avoid buffering of status message + + def shouldexit(self): + """True if server should shut down; checked per pollinterval""" + return False + + def newconnection(self): + """Called when main process notices new connection""" + pass + + def createcmdserver(self, repo, conn, fin, fout): + """Create new command server instance; called in the process that + serves for the current connection""" + return server(self.ui, repo, fin, fout) + class _requesthandler(socketserver.BaseRequestHandler): def handle(self): _serverequest(self.server.ui, self.server.repo, self.request, @@ -424,9 +462,96 @@ class unixservice(object): finally: self._cleanup() +class unixforkingservice(unixservice): + def __init__(self, ui, repo, opts, handler=None): + super(unixforkingservice, self).__init__(ui, repo, opts) + self._servicehandler = handler or unixservicehandler(ui) + self._sock = None + self._oldsigchldhandler = None + self._workerpids = set() # updated by signal handler; do not iterate + + def init(self): + self._sock = socket.socket(socket.AF_UNIX) + self._servicehandler.bindsocket(self._sock, self.address) + self._sock.listen(5) + o = signal.signal(signal.SIGCHLD, self._sigchldhandler) + self._oldsigchldhandler = o + self._servicehandler.printbanner(self.address) + + def _cleanup(self): + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) + self._sock.close() + self._servicehandler.unlinksocket(self.address) + # don't kill child processes as they have active clients, just wait + self._reapworkers(0) + + def run(self): + try: + self._mainloop() + finally: + self._cleanup() + + def _mainloop(self): + h = self._servicehandler + while not h.shouldexit(): + try: + ready = select.select([self._sock], [], [], h.pollinterval)[0] + if not ready: + continue + conn, _addr = self._sock.accept() + except (select.error, socket.error) as inst: + if inst.args[0] == errno.EINTR: + continue + raise + + pid = os.fork() + if pid: + try: + self.ui.debug('forked worker process (pid=%d)\n' % pid) + self._workerpids.add(pid) + h.newconnection() + finally: + conn.close() # release handle in parent process + else: + try: + self._serveworker(conn) + conn.close() + os._exit(0) + except: # never return, hence no re-raises + try: + self.ui.traceback(force=True) + finally: + os._exit(255) + + def _sigchldhandler(self, signal, frame): + self._reapworkers(os.WNOHANG) + + def _reapworkers(self, options): + while self._workerpids: + try: + pid, _status = os.waitpid(-1, options) + except OSError as inst: + if inst.errno == errno.EINTR: + continue + if inst.errno != errno.ECHILD: + raise + # no child processes at all (reaped by other waitpid()?) + self._workerpids.clear() + return + if pid == 0: + # no waitable child processes + return + self.ui.debug('worker process exited (pid=%d)\n' % pid) + self._workerpids.discard(pid) + + def _serveworker(self, conn): + signal.signal(signal.SIGCHLD, self._oldsigchldhandler) + h = self._servicehandler + _serverequest(self.ui, self.repo, conn, h.createcmdserver) + _servicemap = { 'pipe': pipeservice, - 'unix': unixservice, + 'unix': unixforkingservice, } def createservice(ui, repo, opts):