Patchwork [1,of,3,batching] batching: migrate basic noop batching into peer.peer

login
register
mail settings
Submitter Augie Fackler
Date Aug. 5, 2015, 7:29 p.m.
Message ID <9c49271fe180f287ca3b.1438802996@arthedain.pit.corp.google.com>
Download mbox | patch
Permalink /patch/10108/
State Accepted
Headers show

Comments

Augie Fackler - Aug. 5, 2015, 7:29 p.m.
# HG changeset patch
# User Augie Fackler <augie@google.com>
# Date 1438800694 14400
#      Wed Aug 05 14:51:34 2015 -0400
# Node ID 9c49271fe180f287ca3b43e07ca9b593e2f8c2ce
# Parent  d6106df97eddf20cf1a4c06dd3f0e77f9316442d
batching: migrate basic noop batching into peer.peer

"Real" batching only makes sense for wirepeers, but it greatly
simplifies the clients of peer instances if they can be ignorant to
actual batching capabilities of that peer. By moving the
not-really-batched batching code into peer.peer, all peer instances
now work with the batching API, thus simplifying users.

This leaves a couple of name forwards in wirepeer.py. Originally I had
planned to clean those up, but it kind of unclarifies other bits of
code that want to use batching, so I think it makes sense for the
names to stay exposed by wireproto. Specifically, almost nothing is
currently aware of peer (see largefiles.proto for an example), so
making them be aware of the peer module *and* the wireproto module
seems like some abstraction leakage. I *think* the right long-term fix
would actually be to make wireproto an implementation detail that
clients wouldn't need to know about, but I don't really know what that
would entail at the moment.

As far as I'm aware, no clients of batching in third-party extensions
will need updating, which is nice icing.

Patch

diff --git a/mercurial/peer.py b/mercurial/peer.py
--- a/mercurial/peer.py
+++ b/mercurial/peer.py
@@ -8,9 +8,85 @@ 
 
 from i18n import _
 import error
+import util
+
+# abstract batching support
+
+class future(object):
+    '''placeholder for a value to be set later'''
+    def set(self, value):
+        if util.safehasattr(self, 'value'):
+            raise error.RepoError("future is already set")
+        self.value = value
+
+class batcher(object):
+    '''base class for batches of commands submittable in a single request
+
+    All methods invoked on instances of this class are simply queued and
+    return a a future for the result. Once you call submit(), all the queued
+    calls are performed and the results set in their respective futures.
+    '''
+    def __init__(self):
+        self.calls = []
+    def __getattr__(self, name):
+        def call(*args, **opts):
+            resref = future()
+            self.calls.append((name, args, opts, resref,))
+            return resref
+        return call
+    def submit(self):
+        pass
+
+class localbatch(batcher):
+    '''performs the queued calls directly'''
+    def __init__(self, local):
+        batcher.__init__(self)
+        self.local = local
+    def submit(self):
+        for name, args, opts, resref in self.calls:
+            resref.set(getattr(self.local, name)(*args, **opts))
+
+def batchable(f):
+    '''annotation for batchable methods
+
+    Such methods must implement a coroutine as follows:
+
+    @batchable
+    def sample(self, one, two=None):
+        # Handle locally computable results first:
+        if not one:
+            yield "a local result", None
+        # Build list of encoded arguments suitable for your wire protocol:
+        encargs = [('one', encode(one),), ('two', encode(two),)]
+        # Create future for injection of encoded result:
+        encresref = future()
+        # Return encoded arguments and future:
+        yield encargs, encresref
+        # Assuming the future to be filled with the result from the batched
+        # request now. Decode it:
+        yield decode(encresref.value)
+
+    The decorator returns a function which wraps this coroutine as a plain
+    method, but adds the original method as an attribute called "batchable",
+    which is used by remotebatch to split the call into separate encoding and
+    decoding phases.
+    '''
+    def plain(*args, **opts):
+        batchable = f(*args, **opts)
+        encargsorres, encresref = batchable.next()
+        if not encresref:
+            return encargsorres # a local result in this case
+        self = args[0]
+        encresref.set(self._submitone(f.func_name, encargsorres))
+        return batchable.next()
+    setattr(plain, 'batchable', f)
+    return plain
 
 class peerrepository(object):
 
+    def batch(self):
+        return localbatch(self)
+
     def capable(self, name):
         '''tell whether repo supports named capability.
         return False if not supported.
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -58,48 +58,12 @@  class abstractserverproto(object):
         Some protocols may have compressed the contents."""
         raise NotImplementedError()
 
-# abstract batching support
-
-class future(object):
-    '''placeholder for a value to be set later'''
-    def set(self, value):
-        if util.safehasattr(self, 'value'):
-            raise error.RepoError("future is already set")
-        self.value = value
-
-class batcher(object):
-    '''base class for batches of commands submittable in a single request
-
-    All methods invoked on instances of this class are simply queued and
-    return a a future for the result. Once you call submit(), all the queued
-    calls are performed and the results set in their respective futures.
-    '''
-    def __init__(self):
-        self.calls = []
-    def __getattr__(self, name):
-        def call(*args, **opts):
-            resref = future()
-            self.calls.append((name, args, opts, resref,))
-            return resref
-        return call
-    def submit(self):
-        pass
-
-class localbatch(batcher):
-    '''performs the queued calls directly'''
-    def __init__(self, local):
-        batcher.__init__(self)
-        self.local = local
-    def submit(self):
-        for name, args, opts, resref in self.calls:
-            resref.set(getattr(self.local, name)(*args, **opts))
-
-class remotebatch(batcher):
+class remotebatch(peer.batcher):
     '''batches the queued calls; uses as few roundtrips as possible'''
     def __init__(self, remote):
         '''remote must support _submitbatch(encbatch) and
         _submitone(op, encargs)'''
-        batcher.__init__(self)
+        peer.batcher.__init__(self)
         self.remote = remote
     def submit(self):
         req, rsp = [], []
@@ -128,41 +92,10 @@  class remotebatch(batcher):
             encresref.set(encres)
             resref.set(batchable.next())
 
-def batchable(f):
-    '''annotation for batchable methods
-
-    Such methods must implement a coroutine as follows:
-
-    @batchable
-    def sample(self, one, two=None):
-        # Handle locally computable results first:
-        if not one:
-            yield "a local result", None
-        # Build list of encoded arguments suitable for your wire protocol:
-        encargs = [('one', encode(one),), ('two', encode(two),)]
-        # Create future for injection of encoded result:
-        encresref = future()
-        # Return encoded arguments and future:
-        yield encargs, encresref
-        # Assuming the future to be filled with the result from the batched
-        # request now. Decode it:
-        yield decode(encresref.value)
-
-    The decorator returns a function which wraps this coroutine as a plain
-    method, but adds the original method as an attribute called "batchable",
-    which is used by remotebatch to split the call into separate encoding and
-    decoding phases.
-    '''
-    def plain(*args, **opts):
-        batchable = f(*args, **opts)
-        encargsorres, encresref = batchable.next()
-        if not encresref:
-            return encargsorres # a local result in this case
-        self = args[0]
-        encresref.set(self._submitone(f.func_name, encargsorres))
-        return batchable.next()
-    setattr(plain, 'batchable', f)
-    return plain
+# Forward a couple of names from peer to make wireproto interactions
+# slightly more sensible.
+batchable = peer.batchable
+future = peer.future
 
 # list of nodes encoding / decoding
 
diff --git a/tests/test-batching.py b/tests/test-batching.py
--- a/tests/test-batching.py
+++ b/tests/test-batching.py
@@ -5,7 +5,8 @@ 
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
-from mercurial.wireproto import localbatch, remotebatch, batchable, future
+from mercurial.peer import localbatch, batchable, future
+from mercurial.wireproto import remotebatch
 
 # equivalent of repo.repository
 class thing(object):
diff --git a/tests/test-wireproto.py b/tests/test-wireproto.py
--- a/tests/test-wireproto.py
+++ b/tests/test-wireproto.py
@@ -12,6 +12,10 @@  class proto(object):
 class clientpeer(wireproto.wirepeer):
     def __init__(self, serverrepo):
         self.serverrepo = serverrepo
+
+    def _capabilities(self):
+        return ['batch']
+
     def _call(self, cmd, **args):
         return wireproto.dispatch(self.serverrepo, proto(args), cmd)