@@ -5,9 +5,8 @@
Eins und Zwei
One and Two
Eins und Zwei
-Hello, John Smith
-Ready.
Uno und Due
+proper end of results generator
== Remote
Ready.
@@ -17,14 +16,11 @@
REQ: bar?b=Fjot&a=[xfj
-> Fjot!voe![xfj
Eins und Zwei
-REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj
- -> Pof!boe!Uxp;Fjot!voe![xfj
-REQ: greet?name=Kpio!Tnjui
- -> Ifmmp-!Kpio!Tnjui
-REQ: batch?cmds=bar:b=Vop,a=Evf
- -> Vop!voe!Evf
+REQ: batch?cmds=foo:one=Pof,two=Uxp;bar:b=Fjot,a=[xfj;bar:b=Vop,a=Evf
+ -> Pof!boe!Uxp;Fjot!voe![xfj;Vop!voe!Evf
One and Two
Eins und Zwei
-Hello, John Smith
-Ready.
Uno und Due
+proper end of results generator
+Attempted to batch a non-batchable call to 'greet'
+Attempted to batch a non-batchable call to 'hello'
@@ -8,7 +8,9 @@
from __future__ import absolute_import, print_function
from mercurial import (
+ error,
peer,
+ util,
wireproto,
)
@@ -27,9 +29,9 @@
return "%s und %s" % (b, a,)
def greet(self, name=None):
return "Hello, %s" % name
- def batch(self):
+ def batchiter(self):
'''Support for local batching.'''
- return peer.localbatch(self)
+ return peer.localiterbatcher(self)
# usage of "thing" interface
def use(it):
@@ -41,27 +43,54 @@
print(it.foo("Un", two="Deux"))
print(it.bar("Eins", "Zwei"))
- # Batched call to a couple of (possibly proxied) methods.
- batch = it.batch()
+ # Batched call to a couple of proxied methods.
+ batch = it.batchiter()
# The calls return futures to eventually hold results.
foo = batch.foo(one="One", two="Two")
bar = batch.bar("Eins", "Zwei")
- # We can call non-batchable proxy methods, but the break the current batch
- # request and cause additional roundtrips.
- greet = batch.greet(name="John Smith")
- # We can also add local methods into the mix, but they break the batch too.
- hello = batch.hello()
bar2 = batch.bar(b="Uno", a="Due")
- # Only now are all the calls executed in sequence, with as few roundtrips
- # as possible.
+
+ # Future shouldn't be set until we submit().
+ assert isinstance(foo, peer.future)
+ assert not util.safehasattr(foo, 'value')
+ assert not util.safehasattr(bar, 'value')
batch.submit()
- # After the call to submit, the futures actually contain values.
+ # Call results() to obtain results as a generator.
+ results = batch.results()
+
+ # Future results shouldn't be set until we consume a value.
+ assert not util.safehasattr(foo, 'value')
+ foovalue = next(results)
+ assert util.safehasattr(foo, 'value')
+ assert foovalue == foo.value
print(foo.value)
+ next(results)
print(bar.value)
- print(greet.value)
- print(hello.value)
+ next(results)
print(bar2.value)
+ # We should be at the end of the results generator.
+ try:
+ next(results)
+ except StopIteration:
+ print('proper end of results generator')
+ else:
+ print('extra emitted element!')
+
+ # Attempting to call a non-batchable method inside a batch fails.
+ batch = it.batchiter()
+ try:
+ batch.greet(name='John Smith')
+ except error.ProgrammingError as e:
+ print(e)
+
+ # Attempting to call a local method inside a batch fails.
+ batch = it.batchiter()
+ try:
+ batch.hello()
+ except error.ProgrammingError as e:
+ print(e)
+
# local usage
mylocal = localthing()
print()
@@ -144,10 +173,11 @@
req.append(name + ':' + args)
req = ';'.join(req)
res = self._submitone('batch', [('cmds', req,)])
- return res.split(';')
+ for r in res.split(';'):
+ yield r
- def batch(self):
- return wireproto.remotebatch(self)
+ def batchiter(self):
+ return wireproto.remoteiterbatcher(self)
@peer.batchable
def foo(self, one, two=None):
@@ -133,23 +133,47 @@
This is mostly valuable over http where request sizes can be
limited, but can be used in other places as well.
"""
- req, rsp = [], []
- for name, args, opts, resref in self.calls:
- mtd = getattr(self._remote, name)
+ # 2-tuple of (command, arguments) that represents what will be
+ # sent over the wire.
+ requests = []
+
+ # 4-tuple of (command, final future, @batchable generator, remote
+ # future).
+ results = []
+
+ for command, args, opts, finalfuture in self.calls:
+ mtd = getattr(self._remote, command)
batchable = mtd.batchable(mtd.im_self, *args, **opts)
- encargsorres, encresref = next(batchable)
- assert encresref
- req.append((name, encargsorres))
- rsp.append((batchable, encresref))
- if req:
- self._resultiter = self._remote._submitbatch(req)
- self._rsp = rsp
+
+ commandargs, fremote = next(batchable)
+ assert fremote
+ requests.append((command, commandargs))
+ results.append((command, finalfuture, batchable, fremote))
+
+ if requests:
+ self._resultiter = self._remote._submitbatch(requests)
+
+ self._results = results
def results(self):
- for (batchable, encresref), encres in itertools.izip(
- self._rsp, self._resultiter):
- encresref.set(encres)
- yield next(batchable)
+ for command, finalfuture, batchable, remotefuture in self._results:
+ # Get the raw result, set it in the remote future, feed it
+ # back into the @batchable generator so it can be decoded, and
+ # set the result on the final future to this value.
+ remoteresult = next(self._resultiter)
+ remotefuture.set(remoteresult)
+ finalfuture.set(next(batchable))
+
+ # Verify our @batchable generators only emit 2 values.
+ try:
+ next(batchable)
+ except StopIteration:
+ pass
+ else:
+ raise error.ProgrammingError('%s @batchable generator emitted '
+ 'unexpected value count' % command)
+
+ yield finalfuture.value
# Forward a couple of names from peer to make wireproto interactions
# slightly more sensible.
@@ -69,7 +69,8 @@
def results(self):
for name, args, opts, resref in self.calls:
- yield getattr(self.local, name)(*args, **opts)
+ resref.set(getattr(self.local, name)(*args, **opts))
+ yield resref.value
def batchable(f):
'''annotation for batchable methods