Patchwork D319: wireproto: overhaul iterating batcher code (API)

login
register
mail settings
Submitter phabricator
Date Aug. 11, 2017, 6:52 p.m.
Message ID <5644a52d524c38bb9b975de0d360ff50@localhost.localdomain>
Download mbox | patch
Permalink /patch/22893/
State Not Applicable
Headers show

Comments

phabricator - Aug. 11, 2017, 6:52 p.m.
This revision was automatically updated to reflect the committed changes.
Closed by commit rHG4c706037adef: wireproto: overhaul iterating batcher code (API) (authored by indygreg).

REPOSITORY
  rHG Mercurial

CHANGES SINCE LAST UPDATE
  https://phab.mercurial-scm.org/D319?vs=734&id=793

REVISION DETAIL
  https://phab.mercurial-scm.org/D319

AFFECTED FILES
  mercurial/peer.py
  mercurial/wireproto.py
  tests/test-batching.py
  tests/test-batching.py.out

CHANGE DETAILS




To: indygreg, #hg-reviewers
Cc: mercurial-devel

Patch

diff --git a/tests/test-batching.py.out b/tests/test-batching.py.out
--- a/tests/test-batching.py.out
+++ b/tests/test-batching.py.out
@@ -5,9 +5,8 @@ 
 Eins und Zwei
 One and Two
 Eins und Zwei
-Hello, John Smith
-Ready.
 Uno und Due
+proper end of results generator
 
 == Remote
 Ready.
@@ -17,14 +16,11 @@ 
 REQ: bar?b=Fjot&a=[xfj
   -> Fjot!voe![xfj
 Eins und Zwei
-REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj
-  -> Pof!boe!Uxp;Fjot!voe![xfj
-REQ: greet?name=Kpio!Tnjui
-  -> Ifmmp-!Kpio!Tnjui
-REQ: batch?cmds=bar:b=Vop,a=Evf
-  -> Vop!voe!Evf
+REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj;bar:b=Vop,a=Evf
+  -> Pof!boe!Uxp;Fjot!voe![xfj;Vop!voe!Evf
 One and Two
 Eins und Zwei
-Hello, John Smith
-Ready.
 Uno und Due
+proper end of results generator
+Attempted to batch a non-batchable call to 'greet'
+Attempted to batch a non-batchable call to 'hello'
diff --git a/tests/test-batching.py b/tests/test-batching.py
--- a/tests/test-batching.py
+++ b/tests/test-batching.py
@@ -8,7 +8,9 @@ 
 from __future__ import absolute_import, print_function
 
 from mercurial import (
+    error,
     peer,
+    util,
     wireproto,
 )
 
@@ -27,9 +29,9 @@ 
         return "%s und %s" % (b, a,)
     def greet(self, name=None):
         return "Hello, %s" % name
-    def batch(self):
+    def batchiter(self):
         '''Support for local batching.'''
-        return peer.localbatch(self)
+        return peer.localiterbatcher(self)
 
 # usage of "thing" interface
 def use(it):
@@ -41,27 +43,54 @@ 
     print(it.foo("Un", two="Deux"))
     print(it.bar("Eins", "Zwei"))
 
-    # Batched call to a couple of (possibly proxied) methods.
-    batch = it.batch()
+    # Batched call to a couple of proxied methods.
+    batch = it.batchiter()
     # The calls return futures to eventually hold results.
     foo = batch.foo(one="One", two="Two")
     bar = batch.bar("Eins", "Zwei")
-    # We can call non-batchable proxy methods, but the break the current batch
-    # request and cause additional roundtrips.
-    greet = batch.greet(name="John Smith")
-    # We can also add local methods into the mix, but they break the batch too.
-    hello = batch.hello()
     bar2 = batch.bar(b="Uno", a="Due")
-    # Only now are all the calls executed in sequence, with as few roundtrips
-    # as possible.
+
+    # Future shouldn't be set until we submit().
+    assert isinstance(foo, peer.future)
+    assert not util.safehasattr(foo, 'value')
+    assert not util.safehasattr(bar, 'value')
     batch.submit()
-    # After the call to submit, the futures actually contain values.
+    # Call results() to obtain results as a generator.
+    results = batch.results()
+
+    # Future results shouldn't be set until we consume a value.
+    assert not util.safehasattr(foo, 'value')
+    foovalue = next(results)
+    assert util.safehasattr(foo, 'value')
+    assert foovalue == foo.value
     print(foo.value)
+    next(results)
     print(bar.value)
-    print(greet.value)
-    print(hello.value)
+    next(results)
     print(bar2.value)
 
+    # We should be at the end of the results generator.
+    try:
+        next(results)
+    except StopIteration:
+        print('proper end of results generator')
+    else:
+        print('extra emitted element!')
+
+    # Attempting to call a non-batchable method inside a batch fails.
+    batch = it.batchiter()
+    try:
+        batch.greet(name='John Smith')
+    except error.ProgrammingError as e:
+        print(e)
+
+    # Attempting to call a local method inside a batch fails.
+    batch = it.batchiter()
+    try:
+        batch.hello()
+    except error.ProgrammingError as e:
+        print(e)
+
 # local usage
 mylocal = localthing()
 print()
@@ -144,10 +173,11 @@ 
             req.append(name + ':' + args)
         req = ';'.join(req)
         res = self._submitone('batch', [('cmds', req,)])
-        return res.split(';')
+        for r in res.split(';'):
+            yield r
 
-    def batch(self):
-        return wireproto.remotebatch(self)
+    def batchiter(self):
+        return wireproto.remoteiterbatcher(self)
 
     @peer.batchable
     def foo(self, one, two=None):
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -133,23 +133,47 @@ 
         This is mostly valuable over http where request sizes can be
         limited, but can be used in other places as well.
         """
-        req, rsp = [], []
-        for name, args, opts, resref in self.calls:
-            mtd = getattr(self._remote, name)
+        # 2-tuple of (command, arguments) that represents what will be
+        # sent over the wire.
+        requests = []
+
+        # 4-tuple of (command, final future, @batchable generator, remote
+        # future).
+        results = []
+
+        for command, args, opts, finalfuture in self.calls:
+            mtd = getattr(self._remote, command)
             batchable = mtd.batchable(mtd.im_self, *args, **opts)
-            encargsorres, encresref = next(batchable)
-            assert encresref
-            req.append((name, encargsorres))
-            rsp.append((batchable, encresref))
-        if req:
-            self._resultiter = self._remote._submitbatch(req)
-        self._rsp = rsp
+
+            commandargs, fremote = next(batchable)
+            assert fremote
+            requests.append((command, commandargs))
+            results.append((command, finalfuture, batchable, fremote))
+
+        if requests:
+            self._resultiter = self._remote._submitbatch(requests)
+
+        self._results = results
 
     def results(self):
-        for (batchable, encresref), encres in itertools.izip(
-                self._rsp, self._resultiter):
-            encresref.set(encres)
-            yield next(batchable)
+        for command, finalfuture, batchable, remotefuture in self._results:
+            # Get the raw result, set it in the remote future, feed it
+            # back into the @batchable generator so it can be decoded, and
+            # set the result on the final future to this value.
+            remoteresult = next(self._resultiter)
+            remotefuture.set(remoteresult)
+            finalfuture.set(next(batchable))
+
+            # Verify our @batchable generators only emit 2 values.
+            try:
+                next(batchable)
+            except StopIteration:
+                pass
+            else:
+                raise error.ProgrammingError('%s @batchable generator emitted '
+                                             'unexpected value count' % command)
+
+            yield finalfuture.value
 
 # Forward a couple of names from peer to make wireproto interactions
 # slightly more sensible.
diff --git a/mercurial/peer.py b/mercurial/peer.py
--- a/mercurial/peer.py
+++ b/mercurial/peer.py
@@ -69,7 +69,8 @@ 
 
     def results(self):
         for name, args, opts, resref in self.calls:
-            yield getattr(self.local, name)(*args, **opts)
+            resref.set(getattr(self.local, name)(*args, **opts))
+            yield resref.value
 
 def batchable(f):
     '''annotation for batchable methods