Patchwork [5,of,7,iterbatch] wireproto: make iterbatcher behave streamily over http(s)

login
register
mail settings
Submitter Augie Fackler
Date March 8, 2016, 4:25 a.m.
Message ID <645ef49fe63b02b76111.1457411141@147.17.16.172.in-addr.arpa>
Download mbox | patch
Permalink /patch/13661/
State Accepted
Delegated to: Pierre-Yves David
Headers show

Comments

Augie Fackler - March 8, 2016, 4:25 a.m.
# HG changeset patch
# User Augie Fackler <augie@google.com>
# Date 1456875703 18000
#      Tue Mar 01 18:41:43 2016 -0500
# Node ID 645ef49fe63b02b76111ca3064d8e512de89692d
# Parent  daf0fd2b309cfa46fa5911ed988ad2dad945e9b0
# EXP-Topic batch
wireproto: make iterbatcher behave streamily over http(s)

Unfortunately, the ssh and http implementations are slightly different
due to differences in their _callstream implementations, which
prevents ssh from behaving streamily. We should probably introduce a
new batch command that can stream results over ssh at some point in
the near future.

The streamy behavior of batch over http(s) is an enormous win for
remotefilelog over http: in my testing, it's saving about 40% on file
fetches with a cold cache against a server on localhost.

Patch

diff --git a/mercurial/sshpeer.py b/mercurial/sshpeer.py
--- a/mercurial/sshpeer.py
+++ b/mercurial/sshpeer.py
@@ -231,6 +231,31 @@  class sshpeer(wireproto.wirepeer):
 
     __del__ = cleanup
 
+    def _submitbatch(self, req):
+        cmds = []
+        for op, argsdict in req:
+            args = ','.join('%s=%s' % (wireproto.escapearg(k),
+                                       wireproto.escapearg(v))
+                            for k, v in argsdict.iteritems())
+            cmds.append('%s %s' % (op, args))
+        rsp = self._callstream("batch", cmds=';'.join(cmds))
+        available = self._getamount()
+        # TODO this response parsing is probably suboptimal for large
+        # batches with large responses.
+        toread = min(available, 1024)
+        work = rsp.read(toread)
+        available -= toread
+        chunk = work
+        while chunk:
+            while ';' in work:
+                one, work = work.split(';', 1)
+                yield wireproto.unescapearg(one)
+            toread = min(available, 1024)
+            chunk = rsp.read(toread)
+            available -= toread
+            work += chunk
+        yield wireproto.unescapearg(work)
+
     def _callstream(self, cmd, **args):
         self.ui.debug("sending %s command\n" % cmd)
         self.pipeo.write("%s\n" % cmd)
@@ -291,7 +316,7 @@  class sshpeer(wireproto.wirepeer):
         self._send("", flush=True)
         return self.pipei
 
-    def _recv(self):
+    def _getamount(self):
         l = self.pipei.readline()
         if l == '\n':
             self.readerr()
@@ -299,10 +324,12 @@  class sshpeer(wireproto.wirepeer):
             self._abort(error.OutOfBandError(hint=msg))
         self.readerr()
         try:
-            l = int(l)
+            return int(l)
         except ValueError:
             self._abort(error.ResponseError(_("unexpected response:"), l))
-        return self.pipei.read(l)
+
+    def _recv(self):
+        return self.pipei.read(self._getamount())
 
     def _send(self, data, flush=False):
         self.pipeo.write("%d\n" % len(data))
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -7,6 +7,7 @@ 
 
 from __future__ import absolute_import
 
+import itertools
 import os
 import sys
 import tempfile
@@ -119,19 +120,35 @@  class remoteiterbatcher(peer.iterbatcher
         super(remoteiterbatcher, self).__init__()
         self._remote = remote
 
+    def __getattr__(self, name):
+        if not getattr(self._remote, name, False):
+            raise AttributeError(
+                'Attempted to iterbatch non-batchable call to %r' % name)
+        return super(remoteiterbatcher, self).__getattr__(name)
+
     def submit(self):
         """Break the batch request into many patch calls and pipeline them.
 
         This is mostly valuable over http where request sizes can be
         limited, but can be used in other places as well.
         """
-        rb = self._remote.batch()
-        rb.calls = self.calls
-        rb.submit()
+        req, rsp = [], []
+        for name, args, opts, resref in self.calls:
+            mtd = getattr(self._remote, name)
+            batchable = mtd.batchable(mtd.im_self, *args, **opts)
+            encargsorres, encresref = batchable.next()
+            assert encresref
+            req.append((name, encargsorres))
+            rsp.append((batchable, encresref))
+        if req:
+            self._resultiter = self._remote._submitbatch(req)
+        self._rsp = rsp
 
     def results(self):
-        for name, args, opts, resref in self.calls:
-            yield resref.value
+        for (batchable, encresref), encres in itertools.izip(
+                self._rsp, self._resultiter):
+            encresref.set(encres)
+            yield batchable.next()
 
 # Forward a couple of names from peer to make wireproto interactions
 # slightly more sensible.
@@ -202,13 +219,28 @@  class wirepeer(peer.peerrepository):
         else:
             return peer.localbatch(self)
     def _submitbatch(self, req):
+        """run batch request <req> on the server
+
+        Returns an iterator of the raw responses from the server.
+        """
         cmds = []
         for op, argsdict in req:
             args = ','.join('%s=%s' % (escapearg(k), escapearg(v))
                             for k, v in argsdict.iteritems())
             cmds.append('%s %s' % (op, args))
-        rsp = self._call("batch", cmds=';'.join(cmds))
-        return [unescapearg(r) for r in rsp.split(';')]
+        rsp = self._callstream("batch", cmds=';'.join(cmds))
+        # TODO this response parsing is probably suboptimal for large
+        # batches with large responses.
+        work = rsp.read(1024)
+        chunk = work
+        while chunk:
+            while ';' in work:
+                one, work = work.split(';', 1)
+                yield unescapearg(one)
+            chunk = rsp.read(1024)
+            work += chunk
+        yield unescapearg(work)
+
     def _submitone(self, op, args):
         return self._call(op, **args)
 
diff --git a/tests/test-wireproto.py b/tests/test-wireproto.py
--- a/tests/test-wireproto.py
+++ b/tests/test-wireproto.py
@@ -1,5 +1,7 @@ 
 from __future__ import absolute_import
 
+import StringIO
+
 from mercurial import wireproto
 
 class proto(object):
@@ -21,6 +23,9 @@  class clientpeer(wireproto.wirepeer):
     def _call(self, cmd, **args):
         return wireproto.dispatch(self.serverrepo, proto(args), cmd)
 
+    def _callstream(self, cmd, **args):
+        return StringIO.StringIO(self._call(cmd, **args))
+
     @wireproto.batchable
     def greet(self, name):
         f = wireproto.future()