Patchwork [04,of,22] wireproto: drop the _decompress method in favor a new call type

login
register
mail settings
Submitter Pierre-Yves David
Date March 28, 2014, 10:40 p.m.
Message ID <927f1610e6958d2b1e69.1396046430@marginatus.alto.octopoid.net>
Download mbox | patch
Permalink /patch/4124/
State Accepted
Commit 167047ba3cfa311f520814a74431e0e746d0a4cd
Headers show

Comments

Pierre-Yves David - March 28, 2014, 10:40 p.m.
# HG changeset patch
# User Pierre-Yves David <pierre-yves.david@fb.com>
# Date 1396041853 25200
#      Fri Mar 28 14:24:13 2014 -0700
# Node ID 927f1610e6958d2b1e694e13365e7250eda2f791
# Parent  0bd9c66b3d1184ca318a6828d3fa73b8b1373623
wireproto: drop the _decompress method in favor a new call type

We already have multiple call function for multiple return type. The
`_decompress` function is only used for http and seems like a layer violation.
We drop it in favor of a new call type dedicated to "stream that may be useful to
compress".
Olle Lundberg - March 29, 2014, 6:14 p.m.
On Fri, Mar 28, 2014 at 11:40 PM, <pierre-yves.david@ens-lyon.org> wrote:

> # HG changeset patch
> # User Pierre-Yves David <pierre-yves.david@fb.com>
> # Date 1396041853 25200
> #      Fri Mar 28 14:24:13 2014 -0700
> # Node ID 927f1610e6958d2b1e694e13365e7250eda2f791
> # Parent  0bd9c66b3d1184ca318a6828d3fa73b8b1373623
> wireproto: drop the _decompress method in favor a new call type
>
> We already have multiple call function for multiple return type. The
> `_decompress` function is only used for http and seems like a layer
> violation.
> We drop it in favor of a new call type dedicated to "stream that may be
> useful to
> compress".
>
> diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
> --- a/mercurial/httppeer.py
> +++ b/mercurial/httppeer.py
> @@ -209,14 +209,12 @@ class httppeer(wireproto.wirepeer):
>                  raise util.Abort(err.args[1])
>          finally:
>              fp.close()
>              os.unlink(tempname)
>
> -    def _abort(self, exception):
> -        raise exception
> -
> -    def _decompress(self, stream):
> +    def _callcompressable(self, cmd, **args):
> +        stream =  self._callstream(cmd, **args)
>          return util.chunkbuffer(zgenerator(stream))
>
>  class httpspeer(httppeer):
>      def __init__(self, ui, path):
>          if not url.has_https:
> diff --git a/mercurial/sshpeer.py b/mercurial/sshpeer.py
> --- a/mercurial/sshpeer.py
> +++ b/mercurial/sshpeer.py
> @@ -155,10 +155,13 @@ class sshpeer(wireproto.wirepeer):
>                  self.pipeo.write(v)
>          self.pipeo.flush()
>
>          return self.pipei
>
> +    def _callcompressable(self, cmd, **args):
> +        return self._callstream(cmd, **args)
> +
>      def _call(self, cmd, **args):
>          self._callstream(cmd, **args)
>          return self._recv()
>
>      def _callpush(self, cmd, fp, **args):
> @@ -174,12 +177,10 @@ class sshpeer(wireproto.wirepeer):
>          r = self._recv()
>          if r:
>              return '', r
>          return self._recv(), ''
>
> -    def _decompress(self, stream):
> -        return stream
>
>      def _recv(self):
>          l = self.pipei.readline()
>          if l == '\n':
>              err = []
> diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
> --- a/mercurial/wireproto.py
> +++ b/mercurial/wireproto.py
> @@ -312,32 +312,32 @@ class wirepeer(peer.peerrepository):
>      def stream_out(self):
>          return self._callstream('stream_out')
>
>      def changegroup(self, nodes, kind):
>          n = encodelist(nodes)
> -        f = self._callstream("changegroup", roots=n)
> -        return changegroupmod.unbundle10(self._decompress(f), 'UN')
> +        f = self._callcompressable("changegroup", roots=n)
> +        return changegroupmod.unbundle10(f, 'UN')
>
>      def changegroupsubset(self, bases, heads, kind):
>          self.requirecap('changegroupsubset', _('look up remote changes'))
>          bases = encodelist(bases)
>          heads = encodelist(heads)
> -        f = self._callstream("changegroupsubset",
> -                             bases=bases, heads=heads)
> -        return changegroupmod.unbundle10(self._decompress(f), 'UN')
> +        f = self._callcompressable("changegroupsubset",
> +                                   bases=bases, heads=heads)
> +        return changegroupmod.unbundle10(f, 'UN')
>
>      def getbundle(self, source, heads=None, common=None, bundlecaps=None):
>          self.requirecap('getbundle', _('look up remote changes'))
>          opts = {}
>          if heads is not None:
>              opts['heads'] = encodelist(heads)
>          if common is not None:
>              opts['common'] = encodelist(common)
>          if bundlecaps is not None:
>              opts['bundlecaps'] = ','.join(bundlecaps)
> -        f = self._callstream("getbundle", **opts)
> -        return changegroupmod.unbundle10(self._decompress(f), 'UN')
> +        f = self._callcompressable("getbundle", **opts)
> +        return changegroupmod.unbundle10(f, 'UN')
>
>      def unbundle(self, cg, heads, source):
>          '''Send cg (a readable file-like object representing the
>          changegroup to push, typically a chunkbuffer object) to the
>          remote server as a bundle. Return an integer indicating the
> @@ -386,10 +386,23 @@ class wirepeer(peer.peerrepository):
>          The command is expectid to return a stream.
>
>          return server reply as a file like object."""
>          raise NotImplementedError()
>
> +    def _callcompressable(self, cmd, **args):
> +        """execute <cmd> on the server
> +
> +        The command is expected to return a stream.
> +
> +        The stream may have been compressed in some implemented. This
> function
> +        takes care of the decompression. This is the only difference with
> +        _callstream.
> +
> +        return serve5r replay as a file like object.
>
Do you men 'server' and 'reply' here?

> +        """
> +        raise NotImplementedError()
> +
>      def _callpush(self, cmd, fp, **args):
>          """execute a <cmd> on server
>
>          The command is expected to be related to a push. Push having
> special
>          return method.
> @@ -402,16 +415,10 @@ class wirepeer(peer.peerrepository):
>      def _abort(self, exception):
>          """clearly abort the wire protocol connection and raise the
> exception
>          """
>          raise NotImplementedError()
>
> -
> -    def _decompress(self, stream):
> -        """decompress a received stream
> -        """
> -        raise NotImplementedError()
> -
>  # server side
>
>  # wire protocol command can either return a string or one of this class.
>  class streamres(object):
>      """wireproto reply: binary stream
> _______________________________________________
> Mercurial-devel mailing list
> Mercurial-devel@selenic.com
> http://selenic.com/mailman/listinfo/mercurial-devel
>
Pierre-Yves David - March 31, 2014, 7:56 p.m.
>     @@ -386,10 +386,23 @@ class wirepeer(peer.peerrepository):
>               The command is expectid to return a stream.
>
>               return server reply as a file like object."""
>               raise NotImplementedError()
>
>     +    def _callcompressable(self, cmd, **args):
>     +        """execute <cmd> on the server
>     +
>     +        The command is expected to return a stream.
>     +
>     +        The stream may have been compressed in some implemented.
>     This function
>     +        takes care of the decompression. This is the only
>     difference with
>     +        _callstream.
>     +
>     +        return serve5r replay as a file like object.
>
> Do you men 'server' and 'reply' here?

I do. (fixing a typo during the last pass, adds two…)

Patch

diff --git a/mercurial/httppeer.py b/mercurial/httppeer.py
--- a/mercurial/httppeer.py
+++ b/mercurial/httppeer.py
@@ -209,14 +209,12 @@  class httppeer(wireproto.wirepeer):
                 raise util.Abort(err.args[1])
         finally:
             fp.close()
             os.unlink(tempname)
 
-    def _abort(self, exception):
-        raise exception
-
-    def _decompress(self, stream):
+    def _callcompressable(self, cmd, **args):
+        stream =  self._callstream(cmd, **args)
         return util.chunkbuffer(zgenerator(stream))
 
 class httpspeer(httppeer):
     def __init__(self, ui, path):
         if not url.has_https:
diff --git a/mercurial/sshpeer.py b/mercurial/sshpeer.py
--- a/mercurial/sshpeer.py
+++ b/mercurial/sshpeer.py
@@ -155,10 +155,13 @@  class sshpeer(wireproto.wirepeer):
                 self.pipeo.write(v)
         self.pipeo.flush()
 
         return self.pipei
 
+    def _callcompressable(self, cmd, **args):
+        return self._callstream(cmd, **args)
+
     def _call(self, cmd, **args):
         self._callstream(cmd, **args)
         return self._recv()
 
     def _callpush(self, cmd, fp, **args):
@@ -174,12 +177,10 @@  class sshpeer(wireproto.wirepeer):
         r = self._recv()
         if r:
             return '', r
         return self._recv(), ''
 
-    def _decompress(self, stream):
-        return stream
 
     def _recv(self):
         l = self.pipei.readline()
         if l == '\n':
             err = []
diff --git a/mercurial/wireproto.py b/mercurial/wireproto.py
--- a/mercurial/wireproto.py
+++ b/mercurial/wireproto.py
@@ -312,32 +312,32 @@  class wirepeer(peer.peerrepository):
     def stream_out(self):
         return self._callstream('stream_out')
 
     def changegroup(self, nodes, kind):
         n = encodelist(nodes)
-        f = self._callstream("changegroup", roots=n)
-        return changegroupmod.unbundle10(self._decompress(f), 'UN')
+        f = self._callcompressable("changegroup", roots=n)
+        return changegroupmod.unbundle10(f, 'UN')
 
     def changegroupsubset(self, bases, heads, kind):
         self.requirecap('changegroupsubset', _('look up remote changes'))
         bases = encodelist(bases)
         heads = encodelist(heads)
-        f = self._callstream("changegroupsubset",
-                             bases=bases, heads=heads)
-        return changegroupmod.unbundle10(self._decompress(f), 'UN')
+        f = self._callcompressable("changegroupsubset",
+                                   bases=bases, heads=heads)
+        return changegroupmod.unbundle10(f, 'UN')
 
     def getbundle(self, source, heads=None, common=None, bundlecaps=None):
         self.requirecap('getbundle', _('look up remote changes'))
         opts = {}
         if heads is not None:
             opts['heads'] = encodelist(heads)
         if common is not None:
             opts['common'] = encodelist(common)
         if bundlecaps is not None:
             opts['bundlecaps'] = ','.join(bundlecaps)
-        f = self._callstream("getbundle", **opts)
-        return changegroupmod.unbundle10(self._decompress(f), 'UN')
+        f = self._callcompressable("getbundle", **opts)
+        return changegroupmod.unbundle10(f, 'UN')
 
     def unbundle(self, cg, heads, source):
         '''Send cg (a readable file-like object representing the
         changegroup to push, typically a chunkbuffer object) to the
         remote server as a bundle. Return an integer indicating the
@@ -386,10 +386,23 @@  class wirepeer(peer.peerrepository):
         The command is expectid to return a stream.
 
         return server reply as a file like object."""
         raise NotImplementedError()
 
+    def _callcompressable(self, cmd, **args):
+        """execute <cmd> on the server
+
+        The command is expected to return a stream.
+
+        The stream may have been compressed in some implemented. This function
+        takes care of the decompression. This is the only difference with
+        _callstream.
+
+        return serve5r replay as a file like object.
+        """
+        raise NotImplementedError()
+
     def _callpush(self, cmd, fp, **args):
         """execute a <cmd> on server
 
         The command is expected to be related to a push. Push having special
         return method.
@@ -402,16 +415,10 @@  class wirepeer(peer.peerrepository):
     def _abort(self, exception):
         """clearly abort the wire protocol connection and raise the exception
         """
         raise NotImplementedError()
 
-
-    def _decompress(self, stream):
-        """decompress a received stream
-        """
-        raise NotImplementedError()
-
 # server side
 
 # wire protocol command can either return a string or one of this class.
 class streamres(object):
     """wireproto reply: binary stream