Patchwork [3,of,7,iterbatch] peer: add an iterbatcher interface

login
register
mail settings
Submitter Augie Fackler
Date March 8, 2016, 4:25 a.m.
Message ID <409accaf401bbad8a3a3.1457411139@147.17.16.172.in-addr.arpa>
Download mbox | patch
Permalink /patch/13659/
State Accepted
Delegated to: Pierre-Yves David
Headers show

Comments

Augie Fackler - March 8, 2016, 4:25 a.m.
# HG changeset patch
# User Augie Fackler <augie@google.com>
# Date 1456875565 18000
#      Tue Mar 01 18:39:25 2016 -0500
# Node ID 409accaf401bbad8a3a3d1523bb455a02f6a4560
# Parent  071d05cce9156434dee4696287119f284631701b
# EXP-Topic batch
peer: add an iterbatcher interface

This is very much like ordinary batch(), but it will let me add a mode
for batch where we have pathologically large requests which are then
handled streamily. This will be a significant improvement for things
like remotefilelog, which may want to request thousands of entities at
once.
Martin von Zweigbergk - March 9, 2016, 12:03 a.m.
On Mon, Mar 7, 2016 at 8:25 PM, Augie Fackler <raf@durin42.com> wrote:
> # HG changeset patch
> # User Augie Fackler <augie@google.com>
> # Date 1456875565 18000
> #      Tue Mar 01 18:39:25 2016 -0500
> # Node ID 409accaf401bbad8a3a3d1523bb455a02f6a4560
> # Parent  071d05cce9156434dee4696287119f284631701b
> # EXP-Topic batch
> peer: add an iterbatcher interface
>
> This is very much like ordinary batch(), but it will let me add a mode
> for batch where we have pathologically large requests which are then
> handled streamily. This will be a significant improvement for things
> like remotefilelog, which may want to request thousands of entities at
> once.

Is there any reason to use the non-streaming batch after this series?
It doesn't seem to be removed in this series. Is that because there
are still reasons to use it? Or because you're being nice to
extensions?

>
> diff --git a/mercurial/peer.py b/mercurial/peer.py
> --- a/mercurial/peer.py
> +++ b/mercurial/peer.py
> @@ -41,6 +41,14 @@ class batcher(object):
>      def submit(self):
>          raise NotImplementedError()
>
> +class iterbatcher(batcher):
> +
> +    def submit(self):
> +        raise NotImplementedError()
> +
> +    def results(self):
> +        raise NotImplementedError()
> +
>  class localbatch(batcher):
>      '''performs the queued calls directly'''
>      def __init__(self, local):
> @@ -50,6 +58,19 @@ class localbatch(batcher):
>          for name, args, opts, resref in self.calls:
>              resref.set(getattr(self.local, name)(*args, **opts))
>
> +class localiterbatcher(iterbatcher):
> +    def __init__(self, local):
> +        super(iterbatcher, self).__init__()
> +        self.local = local
> +
> +    def submit(self):
> +        # submit for a local iter batcher is a noop
> +        pass
> +
> +    def results(self):
> +        for name, args, opts, resref in self.calls:
> +            yield getattr(self.local, name)(*args, **opts)
> +
>  def batchable(f):
>      '''annotation for batchable methods
>
> @@ -91,6 +112,14 @@ class peerrepository(object):
>      def batch(self):
>          return localbatch(self)
>
> +    def iterbatch(self):
> +        """Batch requests but allow iterating over the results.
> +
> +        This is to allow interleaving responses with things like
> +        progress updates for clients.
> +        """
> +        return localiterbatcher(self)
> +
>      def capable(self, name):
>          '''tell whether repo supports named capability.
>          return False if not supported.
> diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
> --- a/mercurial/wireproto.py
> +++ b/mercurial/wireproto.py
> @@ -114,6 +114,25 @@ class remotebatch(peer.batcher):
>              encresref.set(encres)
>              resref.set(batchable.next())
>
> +class remoteiterbatcher(peer.iterbatcher):
> +    def __init__(self, remote):
> +        super(remoteiterbatcher, self).__init__()
> +        self._remote = remote
> +
> +    def submit(self):
> +        """Break the batch request into many patch calls and pipeline them.
> +
> +        This is mostly valuable over http where request sizes can be
> +        limited, but can be used in other places as well.
> +        """
> +        rb = self._remote.batch()
> +        rb.calls = self.calls
> +        rb.submit()
> +
> +    def results(self):
> +        for name, args, opts, resref in self.calls:
> +            yield resref.value
> +
>  # Forward a couple of names from peer to make wireproto interactions
>  # slightly more sensible.
>  batchable = peer.batchable
> @@ -193,6 +212,9 @@ class wirepeer(peer.peerrepository):
>      def _submitone(self, op, args):
>          return self._call(op, **args)
>
> +    def iterbatch(self):
> +        return remoteiterbatcher(self)
> +
>      @batchable
>      def lookup(self, key):
>          self.requirecap('lookup', _('look up remote revision'))
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@mercurial-scm.org
> https://www.mercurial-scm.org/mailman/listinfo/mercurial-devel
Augie Fackler - March 9, 2016, 1:03 a.m.
> On Mar 8, 2016, at 7:03 PM, Martin von Zweigbergk <martinvonz@google.com> wrote:
> 
>> peer: add an iterbatcher interface
>> 
>> This is very much like ordinary batch(), but it will let me add a mode
>> for batch where we have pathologically large requests which are then
>> handled streamily. This will be a significant improvement for things
>> like remotefilelog, which may want to request thousands of entities at
>> once.
> 
> Is there any reason to use the non-streaming batch after this series?
> It doesn't seem to be removed in this series. Is that because there
> are still reasons to use it? Or because you're being nice to
> extensions?

Mostly sloth. A next round of patches (which I don’t *need* to have exist, but I’ll probably do anyway) would deprecate the batch() interface and replace it with a streambatch() interface, and then iterbatcher could become the only way to batch requests. I haven’t done that work yet because it’s not on my critical path.

Patch

diff --git a/mercurial/peer.py b/mercurial/peer.py
--- a/mercurial/peer.py
+++ b/mercurial/peer.py
@@ -41,6 +41,14 @@  class batcher(object):
     def submit(self):
         raise NotImplementedError()
 
+class iterbatcher(batcher):
+
+    def submit(self):
+        raise NotImplementedError()
+
+    def results(self):
+        raise NotImplementedError()
+
 class localbatch(batcher):
     '''performs the queued calls directly'''
     def __init__(self, local):
@@ -50,6 +58,19 @@  class localbatch(batcher):
         for name, args, opts, resref in self.calls:
             resref.set(getattr(self.local, name)(*args, **opts))
 
+class localiterbatcher(iterbatcher):
+    def __init__(self, local):
+        super(iterbatcher, self).__init__()
+        self.local = local
+
+    def submit(self):
+        # submit for a local iter batcher is a noop
+        pass
+
+    def results(self):
+        for name, args, opts, resref in self.calls:
+            yield getattr(self.local, name)(*args, **opts)
+
 def batchable(f):
     '''annotation for batchable methods
 
@@ -91,6 +112,14 @@  class peerrepository(object):
     def batch(self):
         return localbatch(self)
 
+    def iterbatch(self):
+        """Batch requests but allow iterating over the results.
+
+        This is to allow interleaving responses with things like
+        progress updates for clients.
+        """
+        return localiterbatcher(self)
+
     def capable(self, name):
         '''tell whether repo supports named capability.
         return False if not supported.
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -114,6 +114,25 @@  class remotebatch(peer.batcher):
             encresref.set(encres)
             resref.set(batchable.next())
 
+class remoteiterbatcher(peer.iterbatcher):
+    def __init__(self, remote):
+        super(remoteiterbatcher, self).__init__()
+        self._remote = remote
+
+    def submit(self):
+        """Break the batch request into many patch calls and pipeline them.
+
+        This is mostly valuable over http where request sizes can be
+        limited, but can be used in other places as well.
+        """
+        rb = self._remote.batch()
+        rb.calls = self.calls
+        rb.submit()
+
+    def results(self):
+        for name, args, opts, resref in self.calls:
+            yield resref.value
+
 # Forward a couple of names from peer to make wireproto interactions
 # slightly more sensible.
 batchable = peer.batchable
@@ -193,6 +212,9 @@  class wirepeer(peer.peerrepository):
     def _submitone(self, op, args):
         return self._call(op, **args)
 
+    def iterbatch(self):
+        return remoteiterbatcher(self)
+
     @batchable
     def lookup(self, key):
         self.requirecap('lookup', _('look up remote revision'))