Patchwork [1,of,2,RESEND] commandserver: add IPC channel to teach repository path on command finished

login
register
mail settings
Submitter Yuya Nishihara
Date Dec. 11, 2018, 1:12 p.m.
Message ID <96ba2adf8adbc016f1d4.1544533923@mimosa>
Download mbox | patch
Permalink /patch/37107/
State New
Headers show

Comments

Yuya Nishihara - Dec. 11, 2018, 1:12 p.m.
# HG changeset patch
# User Yuya Nishihara <yuya@tcha.org>
# Date 1540991943 -32400
#      Wed Oct 31 22:19:03 2018 +0900
# Node ID 96ba2adf8adbc016f1d4db03ab61429ad9618569
# Parent  4e17679c336bc38709c32d9b8972f0e402e0057a
commandserver: add IPC channel to teach repository path on command finished

The idea is to load recently-used repositories first in the master process,
and fork(). The forked worker can reuse a warm repository if it's preloaded.

There are a couple of ways of in-memory repository caching. They have pros
and cons:

 a. "preload by master"
    pros: can use a single cache dict, maximizing cache hit rate
    cons: need to reload a repo in master process (because worker process
          dies per command)
 b. "prefork"
    pros: can cache a repo without reloading (as worker processes persist)
    cons: lower cache hit rate since each worker has to maintain its own cache
 c. "shared memory" (or separate key-value store server)
    pros: no need to reload a repo in master process, ideally
    cons: need to serialize objects to sharable form

Since my primary goal is to get rid of the cost of loading obsstore without
massive rewrites, (c) doesn't work. (b) isn't ideal since it would require
much more SDRAMs than (a). So I take (a).

The idea credits to Jun Wu.

Patch

diff --git a/mercurial/commandserver.py b/mercurial/commandserver.py
--- a/mercurial/commandserver.py
+++ b/mercurial/commandserver.py
@@ -506,12 +506,19 @@  class unixforkingservice(object):
             raise error.Abort(_('no socket path specified with --address'))
         self._servicehandler = handler or unixservicehandler(ui)
         self._sock = None
+        self._mainipc = None
+        self._workeripc = None
         self._oldsigchldhandler = None
         self._workerpids = set()  # updated by signal handler; do not iterate
         self._socketunlinked = None
 
     def init(self):
         self._sock = socket.socket(socket.AF_UNIX)
+        # IPC channel from many workers to one main process; this is actually
+        # a uni-directional pipe, but is backed by a DGRAM socket so each
+        # message can be easily separated.
+        o = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
+        self._mainipc, self._workeripc = o
         self._servicehandler.bindsocket(self._sock, self.address)
         if util.safehasattr(procutil, 'unblocksignal'):
             procutil.unblocksignal(signal.SIGCHLD)
@@ -527,6 +534,8 @@  class unixforkingservice(object):
     def _cleanup(self):
         signal.signal(signal.SIGCHLD, self._oldsigchldhandler)
         self._sock.close()
+        self._mainipc.close()
+        self._workeripc.close()
         self._unlinksocket()
         # don't kill child processes as they have active clients, just wait
         self._reapworkers(0)
@@ -543,6 +552,8 @@  class unixforkingservice(object):
         selector = selectors.DefaultSelector()
         selector.register(self._sock, selectors.EVENT_READ,
                           self._acceptnewconnection)
+        selector.register(self._mainipc, selectors.EVENT_READ,
+                          self._handlemainipc)
         while True:
             if not exiting and h.shouldexit():
                 # clients can no longer connect() to the domain socket, so
@@ -592,8 +603,10 @@  class unixforkingservice(object):
             try:
                 selector.close()
                 sock.close()
+                self._mainipc.close()
                 self._runworker(conn)
                 conn.close()
+                self._workeripc.close()
                 os._exit(0)
             except:  # never return, hence no re-raises
                 try:
@@ -601,6 +614,17 @@  class unixforkingservice(object):
                 finally:
                     os._exit(255)
 
+    def _handlemainipc(self, sock, selector):
+        """Process messages sent from a worker"""
+        try:
+            path = sock.recv(32768)  # large enough to receive path
+        except socket.error as inst:
+            if inst.args[0] == errno.EINTR:
+                return
+            raise
+
+        self.ui.log(b'cmdserver', b'repository: %s\n', path)
+
     def _sigchldhandler(self, signal, frame):
         self._reapworkers(os.WNOHANG)
 
@@ -628,6 +652,22 @@  class unixforkingservice(object):
         h = self._servicehandler
         try:
             _serverequest(self.ui, self.repo, conn, h.createcmdserver,
-                          prereposetups=None)  # TODO: pass in hook functions
+                          prereposetups=[self._reposetup])
         finally:
             gc.collect()  # trigger __del__ since worker process uses os._exit
+
+    def _reposetup(self, ui, repo):
+        if not repo.local():
+            return
+
+        class unixcmdserverrepo(repo.__class__):
+            def close(self):
+                super(unixcmdserverrepo, self).close()
+                try:
+                    self._cmdserveripc.send(self.root)
+                except socket.error:
+                    self.ui.log(b'cmdserver',
+                                b'failed to send repo root to master\n')
+
+        repo.__class__ = unixcmdserverrepo
+        repo._cmdserveripc = self._workeripc
diff --git a/tests/test-chg.t b/tests/test-chg.t
--- a/tests/test-chg.t
+++ b/tests/test-chg.t
@@ -230,7 +230,6 @@  print only the last 10 lines, since we a
 preserved:
 
   $ cat log/server.log.1 log/server.log | tail -10 | filterlog
-  YYYY/MM/DD HH:MM:SS (PID)> forked worker process (pid=...)
   YYYY/MM/DD HH:MM:SS (PID)> setprocname: ...
   YYYY/MM/DD HH:MM:SS (PID)> received fds: ...
   YYYY/MM/DD HH:MM:SS (PID)> chdir to '$TESTTMP/extreload'
@@ -238,5 +237,6 @@  preserved:
   YYYY/MM/DD HH:MM:SS (PID)> setenv: ...
   YYYY/MM/DD HH:MM:SS (PID)> confighash = ... mtimehash = ...
   YYYY/MM/DD HH:MM:SS (PID)> validate: []
+  YYYY/MM/DD HH:MM:SS (PID)> repository: $TESTTMP/extreload
   YYYY/MM/DD HH:MM:SS (PID)> worker process exited (pid=...)
   YYYY/MM/DD HH:MM:SS (PID)> $TESTTMP/extreload/chgsock/server-... is not owned, exiting.